commit df1d6a9f1c96fc2dae8e56e82c3402f05937e27a Author: AndrĂ¡s Wiesner Date: Thu Apr 23 10:34:28 2026 +0200 - initial diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..35806d7 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,20 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Debug", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/test_main.py", + "python": ".venv/bin/python", + "console": "integratedTerminal", + "args": [ + "/dev/ttyACM0", + ] + } + ] +} \ No newline at end of file diff --git a/DeviceInterface.py b/DeviceInterface.py new file mode 100644 index 0000000..bee8aca --- /dev/null +++ b/DeviceInterface.py @@ -0,0 +1,84 @@ +import serial +import re + +class DeviceInterface: + """ + Helper class for interfacing the device running flexPTP. + """ + + def __init__(self, url: str, options: dict = {}) -> None: + """ + Initialize the interface. + + :param str url: url of the device (tty, socket etc.) + :param dict options: collection of several options (baudrate, bytesize, parity, stopbits etc.) + :rtype: None + """ + + self.device = serial.serial_for_url(url, timeout=0.05) # open remote device + + # set serial port options if applicable + if "baudrate" in options: + self.device.baudrate = options["baudrate"] + if "bytesize" in options: + self.device.bytesize = options["bytesize"] + if "parity" in options: + self.device.parity = options["parity"] + if "stopbits" in options: + self.device.stopbits = options["stopbits"] + + def read_until(self, expected: bytes = serial.LF) -> bytes: + """ + Read from the device until a specific sequence is found. + + :param bytes expected: delimiter sequence + :return: bytes read + :rtype: bytes + """ + + return self.device.read_until(expected=expected) + + + def write(self, data: bytes) -> None: + """ + Write data onto the device + + :param bytes data: data to transfer + """ + + self.device.write(data) + + + def execute_command(self, cmd: str, expect_results: bool = True, separate_results: bool = False) -> str | dict[str, str] | None: + """ + Execute command on the device + + :param str cmd: command to launch + :param bool expect_results: is there any return values expected and awaited? + :param bool separate_results: if True a key-value separation attempted on the results + :return: results (if requested) + :rtype: str | None + """ + + self.device.write((cmd.strip("\r\n") + "\r\n").encode()) # sanitize commands + self.device.read_until(cmd.encode()) # flush echo + if expect_results: # store results if required + timeout = False + results = "" + while not timeout: # store continuous chunks + data = self.device.read(32) + if len(data) > 0: + results += data.decode() + else: + timeout = True + + if separate_results: # separation requested + records = re.findall("^[ ]*([^:]+)[ ]*:[ ]*(.+)[ ]*$", results.strip(), flags=re.MULTILINE) + results = dict[str, str]() + for rec in records: + results[rec[0]] = rec[1].strip() + return results + else: + return results + else: + return None \ No newline at end of file diff --git a/LinuxPtpController.py b/LinuxPtpController.py new file mode 100644 index 0000000..b87f5af --- /dev/null +++ b/LinuxPtpController.py @@ -0,0 +1,4 @@ + +class LinuxPtpObserver: + def __init__(self) -> None: + pass \ No newline at end of file diff --git a/TestController.py b/TestController.py new file mode 100644 index 0000000..2fc35e2 --- /dev/null +++ b/TestController.py @@ -0,0 +1,45 @@ +from DeviceInterface import DeviceInterface +from LinuxPtpController import LinuxPtpObserver + +class TestController: + def __reset_flexptp(self) -> None: + self.__di.execute_command("ptp reset") + + def __start_e2e_l4(self) -> None: + self.__di.execute_command("ptp profile preset default") + + def __start_p2p_l4(self) -> None: + self.__di.execute_command("ptp profile preset defp2p") + + def __start_e2e_l2(self) -> None: + self.__di.execute_command("ptp profile preset default") + self.__di.execute_command("ptp transport 802.3") + self.__reset_flexptp() + + def __start_p2p_l2(self) -> None: + self.__di.execute_command("ptp profile preset defp2p") + self.__di.execute_command("ptp transport 802.3") + self.__reset_flexptp() + + def __start_gPTP(self) -> None: + self.__di.execute_command("ptp profile preset gPTP") + + def __set_priority(self, priority1: int, priority2: int) -> None: + self.__di.execute_command("ptp priority {:d} {:d}".format(priority1, priority2)) + + def __set_domain(self, domain: int) -> None: + self.__di.execute_command("ptp domain {:d}".format(domain)) + + def __disable_all_logging(self) -> None: + logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ] + for lt in logging_types: + self.__di.execute_command("ptp log " + lt + " off", expect_results=False) + + def __set_servo_offset(self, offset: int) -> None: + self.__di.execute_command("ptp servo offset {:d}".format(offset)) + + + def __init__(self, di: DeviceInterface, lptp_observer: LinuxPtpObserver) -> None: + self.__di = di + self.__lptp_observer = lptp_observer + pass \ No newline at end of file diff --git a/__pycache__/DeviceInterface.cpython-313.pyc b/__pycache__/DeviceInterface.cpython-313.pyc new file mode 100644 index 0000000..c4354e3 Binary files /dev/null and b/__pycache__/DeviceInterface.cpython-313.pyc differ diff --git a/linuxptp_configs/E2E_L2.cfg b/linuxptp_configs/E2E_L2.cfg new file mode 100644 index 0000000..9effaa1 --- /dev/null +++ b/linuxptp_configs/E2E_L2.cfg @@ -0,0 +1,6 @@ +[global] +logAnnounceInterval 0 +logSyncInterval 0 +syncReceiptTimeout 3 +network_transport L2 +delay_mechanism E2E \ No newline at end of file diff --git a/linuxptp_configs/E2E_UDP.cfg b/linuxptp_configs/E2E_UDP.cfg new file mode 100644 index 0000000..8d88222 --- /dev/null +++ b/linuxptp_configs/E2E_UDP.cfg @@ -0,0 +1,6 @@ +[global] +logAnnounceInterval 0 +logSyncInterval 0 +syncReceiptTimeout 3 +network_transport UDPv4 +delay_mechanism E2E \ No newline at end of file diff --git a/linuxptp_configs/P2P_L2.cfg b/linuxptp_configs/P2P_L2.cfg new file mode 100644 index 0000000..8f8eaa2 --- /dev/null +++ b/linuxptp_configs/P2P_L2.cfg @@ -0,0 +1,6 @@ +[global] +logAnnounceInterval 0 +logSyncInterval 0 +syncReceiptTimeout 3 +network_transport L2 +delay_mechanism P2P \ No newline at end of file diff --git a/linuxptp_configs/P2P_UDP.cfg b/linuxptp_configs/P2P_UDP.cfg new file mode 100644 index 0000000..9d7979c --- /dev/null +++ b/linuxptp_configs/P2P_UDP.cfg @@ -0,0 +1,6 @@ +[global] +logAnnounceInterval 0 +logSyncInterval 0 +syncReceiptTimeout 3 +network_transport UDPv4 +delay_mechanism P2P \ No newline at end of file diff --git a/linuxptp_configs/gPTP.cfg b/linuxptp_configs/gPTP.cfg new file mode 100644 index 0000000..23b2c44 --- /dev/null +++ b/linuxptp_configs/gPTP.cfg @@ -0,0 +1,10 @@ +[global] +logAnnounceInterval 0 +logSyncInterval -3 +syncReceiptTimeout 3 +path_trace_enabled 1 +follow_up_info 1 +transportSpecific 0x1 +ptp_dst_mac 01:80:C2:00:00:0E +network_transport L2 +delay_mechanism P2P diff --git a/start_linuxptp.sh b/start_linuxptp.sh new file mode 100755 index 0000000..a6fb8d4 --- /dev/null +++ b/start_linuxptp.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +COMMON_FLAGS=--priority1=127 --priority2=255 --gmCapable=1 --neighborPropDelayThresh=100000 --min_neighbor_prop_delay=-20000000 --assume_two_step=1 --ptp_minor_version=0 + +ptp4l -i "$1" -f "linuxptp_configs/$2.cfg" -m -l 6 $COMMON_FLAGS \ No newline at end of file diff --git a/test_main.py b/test_main.py new file mode 100644 index 0000000..0b687c8 --- /dev/null +++ b/test_main.py @@ -0,0 +1,32 @@ +#!/usr/bin/python3 + +import optparse + +import DeviceInterface + +# ---------------------------------- + +usage = " [-s ]" + +parser = optparse.OptionParser(usage=usage) +parser.add_option("-s", dest="baudrate", default=115200) +opts, args = parser.parse_args() + +missing = ( + len(args) < 1 +) +if missing: + print("Something is missing! Usage:", usage) + exit(0) + +di = DeviceInterface.DeviceInterface(url=args[0], options={"baudrate": opts.baudrate }) +#results = di.execute_command("osinfo") +#print(results) + +# get device clock identity +OWN_CLOCK_ID_KEY = "Own clock ID" +clock_id = di.execute_command("ptp info", separate_results=True) +if type(clock_id) == dict[str, str] and OWN_CLOCK_ID_KEY in clock_id: + print("Own clock ID:", clock_id[OWN_CLOCK_ID_KEY]) +else: + print("Could not retrieve device clock ID!") \ No newline at end of file