From cdbec108cbd57f36c904de5fc7af917c0a648d37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A1s=20Wiesner?= Date: Mon, 27 Apr 2026 21:47:57 +0200 Subject: [PATCH] GUI... --- .gitignore | 1 + .vscode/launch.json | 2 + DeviceInterface.py | 107 +++- FlexPtpController.py | 55 +++ GUI.py | 483 +++++++++++++++++++ LinuxPtpController.py | 4 - LinuxPtpObserver.py | 51 ++ TestController.py | 71 ++- __pycache__/DeviceInterface.cpython-313.pyc | Bin 3794 -> 0 bytes linuxptp_configs/{E2E_UDP.cfg => E2E_L4.cfg} | 0 linuxptp_configs/{P2P_UDP.cfg => P2P_L4.cfg} | 0 media/flexPTP_test.png | Bin 0 -> 5441 bytes media/flexPTP_test.svg | 122 +++++ requirements.txt | 9 + start_linuxptp.sh | 4 +- test_main.py | 40 +- 16 files changed, 888 insertions(+), 61 deletions(-) create mode 100644 .gitignore create mode 100644 FlexPtpController.py create mode 100644 GUI.py delete mode 100644 LinuxPtpController.py create mode 100644 LinuxPtpObserver.py delete mode 100644 __pycache__/DeviceInterface.cpython-313.pyc rename linuxptp_configs/{E2E_UDP.cfg => E2E_L4.cfg} (100%) rename linuxptp_configs/{P2P_UDP.cfg => P2P_L4.cfg} (100%) create mode 100644 media/flexPTP_test.png create mode 100644 media/flexPTP_test.svg create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ba0430d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 35806d7..7c57f9f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,8 +12,10 @@ "program": "${workspaceFolder}/test_main.py", "python": ".venv/bin/python", "console": "integratedTerminal", + "justMyCode": false, "args": [ "/dev/ttyACM0", + "enp52s0f0" ] } ] diff --git a/DeviceInterface.py b/DeviceInterface.py index bee8aca..72e0d29 100644 --- a/DeviceInterface.py +++ b/DeviceInterface.py @@ -1,11 +1,40 @@ +import sys, os +import select +from typing import Callable + import serial import re +from threading import Thread, Lock class DeviceInterface: """ Helper class for interfacing the device running flexPTP. """ + def reader_routine(self) -> None: + """ + Thread routine for receiving all incoming communication from + the device. + """ + + while not self.device.closed: + try: + data = self.device.readline() # read serial port line-by-line + + if type(data) is bytes: + # remove colorization escape sequences + data = re.sub(r"\x1b\[[0-9;]*m", "", data.decode()) + + if len(data) > 0: # write (remaining) data to the pipe + os.write(self.__outpipe_rw_fd[1], data.encode()) + + if self.__out_cb is not None: # invoke callback if provided + self.__out_cb(data) + + except: + break # handle closing the device + + def __init__(self, url: str, options: dict = {}) -> None: """ Initialize the interface. @@ -15,7 +44,7 @@ class DeviceInterface: :rtype: None """ - self.device = serial.serial_for_url(url, timeout=0.05) # open remote device + self.device = serial.serial_for_url(url, timeout=None) # open remote device # set serial port options if applicable if "baudrate" in options: @@ -27,17 +56,39 @@ class DeviceInterface: if "stopbits" in options: self.device.stopbits = options["stopbits"] - def read_until(self, expected: bytes = serial.LF) -> bytes: - """ - Read from the device until a specific sequence is found. + # ---- + + self.__out_cb: Callable | None = None + + r, w = os.pipe() + self.__outpipe_rw_fd = [ r, w ] + self.__outpipe_rw = [ os.fdopen(r, "rb"), os.fdopen(w, "wb") ] + + self.__reader_thread = Thread(target=self.reader_routine) + self.__reader_thread.start() + + def close(self) -> None: + """ + Close the device interface. + + :rtype: None + """ + + self.device.close() + self.__reader_thread.join() + self.__outpipe_rw[0].close() + self.__outpipe_rw[1].close() + + def readline(self) -> bytes: + """ + Read a line from the device. - :param bytes expected: delimiter sequence :return: bytes read :rtype: bytes """ - return self.device.read_until(expected=expected) - + return self.__outpipe_rw[0].readline().encode() + def write(self, data: bytes) -> None: """ @@ -47,7 +98,7 @@ class DeviceInterface: """ self.device.write(data) - + def execute_command(self, cmd: str, expect_results: bool = True, separate_results: bool = False) -> str | dict[str, str] | None: """ @@ -60,15 +111,32 @@ class DeviceInterface: :rtype: str | None """ - self.device.write((cmd.strip("\r\n") + "\r\n").encode()) # sanitize commands - self.device.read_until(cmd.encode()) # flush echo + pure_final_cmd = cmd.strip("\r\n") + final_cmd = (pure_final_cmd + "\r\n").encode() # sanitize commands + #self.skip(len(final_cmd)) # flush echo + self.write(final_cmd) # send command + + # wait for the echo to arrive + line = "" + while line != pure_final_cmd: + line = self.__outpipe_rw[0].readline().decode().strip() + if expect_results: # store results if required timeout = False results = "" + + poller = select.poll() + poller.register(self.__outpipe_rw_fd[0], select.POLLIN) + while not timeout: # store continuous chunks - data = self.device.read(32) - if len(data) > 0: - results += data.decode() + res = poller.poll(100) + + if len(res) > 0 and res[0][1] == select.POLLIN: + data = self.__outpipe_rw[0].read(1) + if len(data) > 0: + results += data.decode() + else: + timeout = True else: timeout = True @@ -81,4 +149,15 @@ class DeviceInterface: else: return results else: - return None \ No newline at end of file + return None + + def register_out_callback(self, cb: Callable) -> None: + """ + Register callback for receiving all printed messages from the + device. + + :param Callable cb: callback invoked at each reception + :rtype: None + """ + + self.__out_cb = cb \ No newline at end of file diff --git a/FlexPtpController.py b/FlexPtpController.py new file mode 100644 index 0000000..d90cc90 --- /dev/null +++ b/FlexPtpController.py @@ -0,0 +1,55 @@ +from DeviceInterface import DeviceInterface + +class FlexPtpController: + def __init__(self, di: DeviceInterface) -> None: + self.__di = di + pass + + def reset_flexptp(self) -> None: + self.__di.execute_command("ptp reset") + + def start_e2e_udp(self) -> None: + self.__di.execute_command("ptp profile preset default") + + def start_p2p_udp(self) -> None: + self.__di.execute_command("ptp profile preset defp2p") + + def start_e2e_l2(self) -> None: + self.__di.execute_command("ptp profile preset default") + self.__di.execute_command("ptp transport 802.3") + self.reset_flexptp() + + def start_p2p_l2(self) -> None: + self.__di.execute_command("ptp profile preset defp2p") + self.__di.execute_command("ptp transport 802.3") + self.reset_flexptp() + + def start_gPTP(self) -> None: + self.__di.execute_command("ptp profile preset gPTP") + + def start_by_id(self, id: str) -> None: + id = id.replace("_", "").upper() + if id == "E2EL4": + self.start_e2e_udp() + elif id == "E2EL2": + self.start_e2e_l2() + elif id == "P2PL4": + self.start_p2p_udp() + elif id == "P2PL2": + self.start_p2p_l2() + elif id == "GPTP": + self.start_gPTP() + + def set_priority(self, priority1: int, priority2: int) -> None: + self.__di.execute_command("ptp priority {:d} {:d}".format(priority1, priority2)) + + def set_domain(self, domain: int) -> None: + self.__di.execute_command("ptp domain {:d}".format(domain)) + + def disable_all_logging(self) -> None: + logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ] + for lt in logging_types: + self.__di.execute_command("ptp log " + lt + " off", expect_results=False) + + def set_servo_offset(self, offset: int) -> None: + self.__di.execute_command("ptp servo offset {:d}".format(offset)) \ No newline at end of file diff --git a/GUI.py b/GUI.py new file mode 100644 index 0000000..f5b36de --- /dev/null +++ b/GUI.py @@ -0,0 +1,483 @@ +import queue +from tkinter import Tk, font, ttk +import serial +import serial.tools.list_ports +import ttk_text as ttkt +from ttkthemes import ThemedTk +import tkinter as tk +import netifaces + +from DeviceInterface import DeviceInterface +from FlexPtpController import FlexPtpController +from LinuxPtpObserver import LinuxPtpObserver + +class GUI: + def __init_flexPtp_options(self) -> None: + title = ttk.Label(self.__setup_tab, text="flexPTP options", font=self.__title_font) + frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) + frame.grid(row=0, column=0, sticky=tk.NSEW, ipady=4, padx=4) + #frame.pack(anchor="nw", fill="x", padx=12, ipady=4, side=tk.LEFT, expand=True) + + frame.rowconfigure([0, 1, 2, 3 ], weight=1) + frame.columnconfigure(0, weight=1) + frame.columnconfigure(1, weight=3) + + self.__flexPtp_options_frame = frame + + device_label = ttk.Label(frame, text="Device:") + device_label.grid(column=0, row=0, sticky=tk.E) + + baudrate_label = ttk.Label(frame, text="Baudrate:") + baudrate_label.grid(column=0, row=1, sticky=tk.E) + + baudrate_label = ttk.Label(frame, text="Parity:") + baudrate_label.grid(column=0, row=2, sticky=tk.E) + + baudrate_label = ttk.Label(frame, text="Stopbits:") + baudrate_label.grid(column=0, row=3, sticky=tk.E) + + self.__sel_device = tk.StringVar() + device_combobox = ttk.Combobox(frame, textvariable=self.__sel_device) + device_combobox["values"] = list(map(lambda p: p.device, filter(lambda p: p.subsystem == "usb", serial.tools.list_ports.comports()))) + device_combobox.current(0) + device_combobox.grid(column=1, row=0, sticky=tk.EW, padx=4) + + #device_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__sel_device) + #device_entry.grid(column=1, row=0, sticky=tk.EW, padx=4) + + self.__sel_baudrate = tk.IntVar() + baudrate_combobox = ttk.Combobox(frame, textvariable=self.__sel_baudrate) + baudrate_combobox["values"] = (115200, 57600, 38400, 19200, 9600, 4800) + baudrate_combobox.current(0) + baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) + + self.__sel_parity = tk.StringVar() + self.__sel_parity.set(serial.PARITY_NONE) + parity_frame = tk.Frame(frame) + parity_frame.grid(column=1, row=2, sticky=tk.W) + + parity_none = ttk.Radiobutton(parity_frame, text="None", variable=self.__sel_parity, value=serial.PARITY_NONE) + parity_none.pack(anchor=tk.W, side=tk.LEFT) + + parity_even = ttk.Radiobutton(parity_frame, text="Even", variable=self.__sel_parity, value=serial.PARITY_EVEN) + parity_even.pack(anchor=tk.W, side=tk.LEFT) + + parity_odd = ttk.Radiobutton(parity_frame, text="Odd", variable=self.__sel_parity, value=serial.PARITY_ODD) + parity_odd.pack(anchor=tk.W, side=tk.LEFT) + + self.__sel_stopbits = tk.IntVar() + self.__sel_stopbits.set(1) + stopbits_frame = tk.Frame(frame) + stopbits_frame.grid(column=1, row=3, sticky=tk.W) + + stopbits_one = ttk.Radiobutton(stopbits_frame, text="1", variable=self.__sel_stopbits, value=1) + stopbits_one.pack(anchor=tk.W, side=tk.LEFT) + + stopbits_two = ttk.Radiobutton(stopbits_frame, text="2", variable=self.__sel_stopbits, value=2) + stopbits_two.pack(anchor=tk.W, side=tk.LEFT) + + + def __init_linuxptp_options(self) -> None: + title = ttk.Label(self.__setup_tab, text="linuxptp options", font=self.__title_font) + frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) + frame.grid(row=1, rowspan=1, column=0, sticky=tk.NSEW, ipady=4, padx=4) + # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True) + + frame.rowconfigure([0, 1, 2], weight=1) + # frame.rowconfigure(2, weight=100) + frame.columnconfigure(0, weight=1) + frame.columnconfigure(1, weight=3) + + self.__linuxptp_options_frame = frame + + path_label = ttk.Label(frame, text="Path:") + path_label.grid(column=0, row=0, sticky=tk.E) + + if_label = ttk.Label(frame, text="Interface:") + if_label.grid(column=0, row=1, sticky=tk.E) + + arg_label = ttk.Label(frame, text="Arguments:") + arg_label.grid(column=0, row=2, sticky=tk.E) + + self.__linuxptp_path = tk.StringVar() + self.__linuxptp_path.set("/usr/sbin/ptp4l") + path_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_path) + path_entry.grid(column=1, row=0, sticky=tk.EW, padx=4) + + self.__linuxptp_interface = tk.StringVar() + baudrate_combobox = ttk.Combobox(frame, textvariable=self.__linuxptp_interface) + baudrate_combobox["values"] = netifaces.interfaces() + baudrate_combobox.current(0) + baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) + + self.__linuxptp_args = tk.StringVar() + self.__linuxptp_args.set("") + args_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_args) + args_entry.grid(column=1, row=2, sticky=tk.EW, padx=4) + + + def __init_test_cases(self) -> None: + title = ttk.Label(self.__setup_tab, text="Test cases", font=self.__title_font) + frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) + frame.grid(row=0, rowspan=2, column=1, sticky=tk.NSEW, ipady=4, padx=4) + # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True) + + self.__test_cases_frame = frame + + basic_modes_frame = ttk.LabelFrame(frame, text="Basic modes") + basic_modes_frame.rowconfigure([0], weight=2) + basic_modes_frame.rowconfigure([1, 2], weight=8) + basic_modes_frame.columnconfigure([0], weight=2) + basic_modes_frame.columnconfigure([1, 2], weight=8) + basic_modes_frame.pack(expand=False, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4) + + e2e_label = ttk.Label(basic_modes_frame, text="E2E", style="ProfileLabel.TLabel", padding=6) + e2e_label.grid(row=0, column=1) + + p2p_label = ttk.Label(basic_modes_frame, text="P2P", style="ProfileLabel.TLabel") + p2p_label.grid(row=0, column=2) + + l4_label = ttk.Label(basic_modes_frame, text="L4", style="ProfileLabel.TLabel", padding=10) + l4_label.grid(row=1, column=0) + + l2_label = ttk.Label(basic_modes_frame, text="L2", style="ProfileLabel.TLabel") + l2_label.grid(row=2, column=0) + + modes = { + "e2e_l4": { + "row": 1, + "col": 1, + }, + "e2e_l2": { + "row": 2, + "col": 1, + }, + "p2p_l4": { + "row": 1, + "col": 2, + }, + "p2p_l2": { + "row": 2, + "col": 2, + }, + } + + self.__test_cases = {} + + for name, pos in modes.items(): + oframe = ttk.Frame(basic_modes_frame, width=100, height=100, style="ProfileSquare.TFrame") + oframe.grid(row=pos["row"], column=pos["col"], sticky=tk.NSEW) + oframe.pack_propagate(False) + + iframe = tk.Frame(oframe) + iframe.place(relx=0.5, rely=0.5, anchor=tk.CENTER) + + m = tk.BooleanVar() + m_chk = ttk.Checkbutton(iframe, text="Master", variable=m, style="ProfileChkBox.TCheckbutton") + m_chk.pack(anchor=tk.CENTER, side=tk.TOP) + + s = tk.BooleanVar() + s_chk = ttk.Checkbutton(iframe, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton") + s_chk.pack(anchor=tk.CENTER, side=tk.BOTTOM) + + self.__test_cases[name] = { + "type": "general", + "delmech": name[0:3].upper(), + "layer": name[4:6].upper(), + "master": m, + "slave": s + } + + defined_profiles_frame = ttk.LabelFrame(frame, text="Defined profiles") + defined_profiles_frame.pack(expand=True, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4) + + defined_profiles = [ "gPTP" ] + + ri = 0 + for dprof in defined_profiles: + label = ttk.Label(defined_profiles_frame, text=dprof + ":", font=self.__profile_font) + label.grid(row=ri, column=0, padx=6) + + m = tk.BooleanVar() + m_chk = ttk.Checkbutton(defined_profiles_frame, text="Master", variable=m, style="ProfileChkBox.TCheckbutton") + m_chk.grid(row=ri, column=1) + + s = tk.BooleanVar() + s_chk = ttk.Checkbutton(defined_profiles_frame, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton") + s_chk.grid(row=ri, column=2) + + self.__test_cases[dprof] = { + "type": "defined", + "master": m, + "slave": s + } + + + def __init_test_controller(self) -> None: + title = ttk.Label(self.__setup_tab, text="Test controls", font=self.__title_font) + frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) + frame.grid(row=2, column=0, columnspan=2, sticky=tk.NSEW, ipady=4, padx=4) + + self.__tests_running = False + + def start_stop_tests() -> None: + self.__start_stop_btn.configure(state="disabled") + + if not self.__tests_running: + self.start_tests() + else: + self.stop_tests() + + self.__tests_running = not self.__tests_running + + if self.__tests_running: + self.__start_stop_btn.configure(text="STOP") + state = "disabled" + else: + self.__start_stop_btn.configure(text="START") + state = "normal" + + frames = [ self.__flexPtp_options_frame, self.__linuxptp_options_frame, self.__test_cases_frame ] + + def set_widget_state(parent: tk.Widget | tk.Toplevel) -> None: + for widget in parent.winfo_children(): + if len(widget.winfo_children()) == 0 and widget.widgetName != "frame": + widget.configure(state=state) # type: ignore + else: + set_widget_state(widget) + + for frame in frames: + set_widget_state(frame) + + self.__start_stop_btn.configure(state="enabled") + + self.__start_stop_btn = ttk.Button(frame, text="START", command=start_stop_tests) + self.__start_stop_btn.pack() + + tw = ttk.Treeview(frame, columns=("name", "result", "start", "end", "duration"), selectmode="none") + tw.heading("#0", text="#") + tw.heading("name", text="Name") + tw.heading("result", text="Result") + tw.heading("start", text="Start") + tw.heading("end", text="End") + tw.heading("duration", text="Duration") + + tw.tag_configure("passed", background="LawnGreen") + tw.tag_configure("failed", background="Salmon") + tw.tag_configure("in_progress", background="Gold") + tw.tag_configure("pending", font=("TkDefaultFont", 9, "italic"), foreground="Gray") + + tw.pack() + + self.__test_tw = tw + + + def __init_setup_tab(self) -> None: + self.__setup_tab = ttk.Frame(self.__tabs) + self.__setup_tab.columnconfigure(0, weight=1) + self.__setup_tab.columnconfigure(1, weight=4) + self.__setup_tab.rowconfigure([0, 1], weight=1) + self.__setup_tab.rowconfigure(2, weight=100) + self.__tabs.add(self.__setup_tab, text="Setup") + + self.__init_flexPtp_options() + self.__init_linuxptp_options() + self.__init_test_cases() + self.__init_test_controller() + + + def __init_logtab(self) -> None: + self.__logtab = ttk.Frame(self.__tabs) + self.__tabs.add(self.__logtab, text="Logs") + + self.__logtab.rowconfigure(0, weight=8) + self.__logtab.rowconfigure(1, weight=2) + + self.__logtab.columnconfigure(0, weight=1) + self.__logtab.columnconfigure(1, weight=1) + + terminal_settings = { "wrap": tk.WORD, "background": "#3D3D3D", "borderwidth": 0, "foreground": "white", "blockcursor": True, "insertbackground": "white"} + self.__flexPtp_log = tk.Text(self.__logtab, **terminal_settings) + self.__flexPtp_log.grid(column=0, row=0, sticky=tk.NSEW) + self.__flexPtp_log.configure(font=self.__console_font, state="disabled") + + self.__linuxptp_log = tk.Text(self.__logtab, **terminal_settings) + self.__linuxptp_log.grid(column=1, row=0, sticky=tk.NSEW, pady=0) + self.__linuxptp_log.configure(font=self.__console_font, state="disabled") + + self.__flexPtp_log_queue = queue.Queue() + self.__linuxptp_log_queue = queue.Queue() + + + def __init_test_results(self) -> None: + self.__results_tabs = ttk.Frame(self.__tabs) + self.__tabs.add(self.__results_tabs, text="Results") + + + def __init_tabs(self) -> None: + self.__init_setup_tab() + self.__init_logtab() + self.__init_test_results() + + + def __init_fonts(self) -> None: + self.__console_font = font.Font(family="Ubuntu Mono", size=9) + self.__title_font=font.Font(family="Ubuntu", size=14) + self.__mode_font=font.Font(family="Ubuntu", size=12, weight="bold") + self.__profile_font=font.Font(family="Ubuntu", size=12, slant="italic") + + + def __init_styles(self) -> None: + style = ttk.Style() + style.configure("ProfileChkBox.TCheckbutton", font=self.__console_font) + style.configure("ProfileSquare.TFrame", bordercolor="#003039", borderwidth=1, relief="solid") + style.configure("ProfileLabel.TLabel", font=self.__mode_font) + + + def __init_print_polls(self) -> None: + def print_logs() -> None: + while not self.__flexPtp_log_queue.empty(): + self.__flexPtp_log.configure(state="normal") + self.__flexPtp_log.insert(tk.END, self.__flexPtp_log_queue.get()) + self.__flexPtp_log.configure(state="disabled") + + while not self.__linuxptp_log_queue.empty(): + self.__linuxptp_log.configure(state="normal") + self.__linuxptp_log.insert(tk.END, self.__linuxptp_log_queue.get()) + self.__linuxptp_log.configure(state="disabled") + + self.__win.after(50, print_logs) + + self.__win.after(50, print_logs) + + + def __init__(self) -> None: + self.__win = ThemedTk(theme="arc") + #self.__win.configure() + self.__win.title("flexPTP test suite") + self.__win.iconphoto(False, tk.PhotoImage(file="media/flexPTP_test.png")) + + self.__init_fonts() + self.__init_styles() + + self.__tabs = ttk.Notebook(self.__win) + self.__tabs.configure(width=1000, height=600) + + self.__init_tabs() + + self.__tabs.pack(expand=True, fill="both") + + self.__init_print_polls() + + + def mainloop(self) -> None: + self.__win.mainloop() + + + def __gather_tests(self) -> None: + # gather test cases + self.__tests = [] + for name, case in self.__test_cases.items(): + ptype = case["type"] + + template = {} + if ptype == "general": + template = { "type": ptype, "name": case["delmech"] + " " + case["layer"], "delmech": case["delmech"], "layer": case["layer"] } + elif ptype == "defined": + template = { "type": ptype, "name": name } + + if case["master"].get(): + master_mode = dict(template) + master_mode["mode"] = "master" + master_mode["name"] += " (master)" + self.__tests.append(master_mode) + + if case["slave"].get(): + slave_mode = dict(template) + slave_mode["mode"] = "slave" + slave_mode["name"] += " (slave)" + self.__tests.append(slave_mode) + + + def __populate_tests_treeview(self) -> None: + tw = self.__test_tw + items = tw.get_children() + if items != (): + tw.delete(*items) + + seqnum = 1 + n = len(self.__tests) + for test in self.__tests: + test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-"), tags=("pending")) + seqnum += 1 + + + def __open_flexPtp_interface(self) -> None: + url = self.__sel_device.get() + opts = { + "baudrate": self.__sel_baudrate.get(), + "parity": self.__sel_parity.get(), + "stopbits": self.__sel_stopbits.get() + } + + def echo(data: str) -> None: + self.__flexPtp_log_queue.put(data.replace("\r", "")) + + self.__device_interface = DeviceInterface(url, opts) + self.__device_interface.register_out_callback(echo) + self.__flexPtp_controller = FlexPtpController(self.__device_interface) + + + def __init_linuxptp_observer(self) -> None: + self.__linuxptp_observer = LinuxPtpObserver(self.__linuxptp_interface.get()) + + def echo(data) -> None: + self.__linuxptp_log_queue.put(str(data)) + + self.__linuxptp_observer.register_observer_callback(echo) + + + def __init_flexPtp(self) -> None: + ctrl = self.__flexPtp_controller + + ctrl.disable_all_logging() + ctrl.set_domain(0) + ctrl.reset_flexptp() + + + def start_tests(self) -> None: + self.__gather_tests() + self.__populate_tests_treeview() + self.__open_flexPtp_interface() + self.__init_flexPtp() + self.__init_linuxptp_observer() + + # ---- + + ctrl = self.__flexPtp_controller + observer = self.__linuxptp_observer + for test in self.__tests: + if test["type"] == "general": + id = test["delmech"] + "_" + test["layer"] + else: + id = test["name"] + + ctrl.start_by_id(id) + + priority1 = 128 + if test["mode"] == "master": + priority1 = 100 + + ctrl.set_priority(priority1, 255) + + observer.start_linuxptp(id) + + + + for test in self.__tests: + pass + + + def stop_tests(self) -> None: + self.__linuxptp_observer.stop_linuxptp() + self.__device_interface.close() \ No newline at end of file diff --git a/LinuxPtpController.py b/LinuxPtpController.py deleted file mode 100644 index b87f5af..0000000 --- a/LinuxPtpController.py +++ /dev/null @@ -1,4 +0,0 @@ - -class LinuxPtpObserver: - def __init__(self) -> None: - pass \ No newline at end of file diff --git a/LinuxPtpObserver.py b/LinuxPtpObserver.py new file mode 100644 index 0000000..585b2a6 --- /dev/null +++ b/LinuxPtpObserver.py @@ -0,0 +1,51 @@ +import signal +import subprocess +import select +from threading import Thread +from typing import Callable + +class LinuxPtpObserver: + def __init__(self, ni: str) -> None: + self.__ni = ni + self.__process: subprocess.Popen + self.__observer_thread: Thread + self.__observer_cb: Callable | None = None + pass + + def observer_routine(self) -> None: + if self.__process.stdout is not None: + pfd = select.poll() + pfd.register(self.__process.stdout.fileno(), select.POLLIN) + else: + return + + while self.__process.poll() is None: + res = pfd.poll() + + if len(res) > 0 and res[0][1] & select.POLLIN: + data = self.__process.stdout.readline() + if self.__observer_cb is not None: + self.__observer_cb(data) + + + def start_linuxptp(self, profile: str) -> None: + cmd = ["sudo", "ptp4l", "--priority1=127", "--priority2=255", "--gmCapable=1", "--neighborPropDelayThresh=100000", + "--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0", + "-i", self.__ni, "-f", "linuxptp_configs/{:s}.cfg".format(profile), "-m", "-l", "6"] + + self.__process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + text=True + ) + + self.__observer_thread = Thread(target=self.observer_routine) + self.__observer_thread.start() + + def stop_linuxptp(self) -> None: + self.__process.send_signal(signal.SIGKILL) + self.__observer_thread.join() + + def register_observer_callback(self, cb : Callable) -> None: + self.__observer_cb = cb \ No newline at end of file diff --git a/TestController.py b/TestController.py index 2fc35e2..3a301c7 100644 --- a/TestController.py +++ b/TestController.py @@ -1,45 +1,40 @@ from DeviceInterface import DeviceInterface -from LinuxPtpController import LinuxPtpObserver +from LinuxPtpObserver import LinuxPtpObserver +from FlexPtpController import FlexPtpController class TestController: - def __reset_flexptp(self) -> None: - self.__di.execute_command("ptp reset") - - def __start_e2e_l4(self) -> None: - self.__di.execute_command("ptp profile preset default") - - def __start_p2p_l4(self) -> None: - self.__di.execute_command("ptp profile preset defp2p") - - def __start_e2e_l2(self) -> None: - self.__di.execute_command("ptp profile preset default") - self.__di.execute_command("ptp transport 802.3") - self.__reset_flexptp() - - def __start_p2p_l2(self) -> None: - self.__di.execute_command("ptp profile preset defp2p") - self.__di.execute_command("ptp transport 802.3") - self.__reset_flexptp() - - def __start_gPTP(self) -> None: - self.__di.execute_command("ptp profile preset gPTP") - - def __set_priority(self, priority1: int, priority2: int) -> None: - self.__di.execute_command("ptp priority {:d} {:d}".format(priority1, priority2)) - - def __set_domain(self, domain: int) -> None: - self.__di.execute_command("ptp domain {:d}".format(domain)) - - def __disable_all_logging(self) -> None: - logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ] - for lt in logging_types: - self.__di.execute_command("ptp log " + lt + " off", expect_results=False) - - def __set_servo_offset(self, offset: int) -> None: - self.__di.execute_command("ptp servo offset {:d}".format(offset)) - + def __prepare_device(self) -> None: + self.__fptp_controller.disable_all_logging() + self.__fptp_controller.set_priority(128, 255) + self.__fptp_controller.set_domain(0) + self.__fptp_controller.reset_flexptp() def __init__(self, di: DeviceInterface, lptp_observer: LinuxPtpObserver) -> None: self.__di = di + self.__fptp_controller = FlexPtpController(di) self.__lptp_observer = lptp_observer - pass \ No newline at end of file + pass + + def start_test_e2e_udp(self) -> None: + self.__fptp_controller.start_e2e_udp() + self.__lptp_observer.start_linuxptp("E2E_UDP") + + def start_test_e2e_l2(self) -> None: + self.__fptp_controller.start_e2e_l2() + self.__lptp_observer.start_linuxptp("E2E_L2") + + def start_test_p2p_udp(self) -> None: + self.__fptp_controller.start_p2p_udp() + self.__lptp_observer.start_linuxptp("P2P_UDP") + + def start_test_p2p_l2(self) -> None: + self.__fptp_controller.start_p2p_l2() + self.__lptp_observer.start_linuxptp("P2P_L2") + + def start_test_gPTP(self) -> None: + self.__fptp_controller.start_gPTP() + self.__lptp_observer.start_linuxptp("gPTP") + + def stop_test(self) -> None: + self.__lptp_observer.stop_linuxptp() + diff --git a/__pycache__/DeviceInterface.cpython-313.pyc b/__pycache__/DeviceInterface.cpython-313.pyc deleted file mode 100644 index c4354e3ffe7a83305ddffacb86912ce2144ba15e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3794 zcmb7GU2GJ|5$>6t*&WaN@366r4W{`EUSsSNXOix$&zogYvc$1onG?16HB+_`T_Lf!)dh5pOw0ii< zWAa!W{&K%4gQC3&LbN#c{fgF5(NeE9@`mwvYJ=Y}#I9N>Bybg$~iirn@Sg$d6pIIbV(%qwzd z*o>_?AlilBFH`|tHM+VqQgQMs0Vdl?M$iOJ91t_aUR~E>8S(qDYBUp)5w#eq%#e}j zI@c($S9jK=bu!W$WLEWP@;Vv$4KlbCG%a3{*%;Use}hask{M5oCsNAvAJpTHqEtr>?N112#rXr!NWqP!7DQRjJ zY@3;QL#V)IR~a`zS9oW%>w;0ta>HXi^um zJPE$ym}R^UyL#wSF<)|-o#W_74lzz#ylqOU-`{t!kY{~t!Wie4+jn+Q8#;WpuM(zv z;Z$BAWXqvsYnCx8M)-~T&+O{u}=zP7z z=e4vPBemOJfEmPB*yIGcR6;ULNSVR9Ff1IFq$vYH&PRxsTLgqveY-{B#6C&tJkvS%#7J^05$-au#;Ld4eVH$A5w(=IB&8m=^ z4lyDfMjPd@CD6jS3dGfvz+1t$*53B$ZLT9k-;_)fvO-R^;jP2Ez_)4%7 z*4@p}eMHI|NMrNunOieU4ZBJWyXGes8g?x-yt|;h%Ue*3lr=nKoo~@=fp60=`F6hD z50HnA5~w5M^bge!Ag=&UVGCMtyuunkuV#GcLu3K}AtSPJ!*O$rtHY63MTeLmb zZ*(ES5Go>yiD4Ht%*wz`h}+-JO(Uyl8YzP^CPdEDguOr>!Gy1B`zzlOC;g_t$26{L z8R7rbDB@!QoAEcWN^~SG=ExZ-qGpc;wJ2m+*N-5ERW(hDY<1$Jh-^X2#3QOEr+*GR zK~SGNJJJxXzZ#zA|t=<2BN*z z$V2_DZkjR>a}uK%P(d4Q``=mkIN44jjo`mT6FCM^>nIsTtHJAklLhC_X+bUC4Sc5{ zw3Cof8F^>$A6HXJqYd&*Jv_5_ zLcy+JSQO}K#5#BgaYhYi3P1~=IrOSw7a7#X3LwK$mKxU#$jj`(b;_#q2Gr%~8D0c! z^fH^;8z=&Z46+OBwBdOyKLL88LsU}m6?#?(v%VQat7Luue~lb?OnH zJg4A1j-YvFy7T-f{jj@h^nCimJt>@~gYVw(VrO3gi=9R0F$`|Qz|Nz&RmJO1x_(nz1RZ+_5yzk6ZV;Xhvc>y1C%_{)!< zod40%`Aa4EwO^i1JZta#rTd$~xyk#zi|yUBiKiXgX9t(RlbYZ5S?+iCXZEk1*`p7I zdq?iPzY-&xcYFzo>%*?8HNbW#uUJ=8;+Z3f~o8LZBWGfutQ|v2w1*&@`x`7MJ$MpFl;1DDvo~lH|o0U z8J=b85aryu04eM0`Xdq;7nXA@D;zugyT5|&r({JGWu>f;ef=w>N&ep4U>S$`ox$)} z`WnB=-K1yV?*|?YtdJUc;Nku<4v#l}jYGLDA=9~CWdg&)p|5aQX{K`9!*|LAhR0Gc zEPOB;ju$U4IDEmqx-dDj;Lens>!pj=%Y+;fhr|^^EMZ7|iDChBXgS)NiUmI4JunXr uzz^aK!7&U~w6u>JBc#~&PlmxhHe z008Fr58#bPYW_oz)=$;k&&bosFYu+0BOov^P|(!_=4=1b%TdtN$NBw{EFAzqG4WhQ z(KsmocR`4s@qE{?LXTJmfIQP&^|h%R`bi?vL>1Y!X5J3nvuQ%m``1fSVLo#J6!`D;^9+Glp|KWEZ24at7_L31u{3R^U+Rw_X zFmkohV_^kqL`srMk^(m~4)0|SE^g&UCEU|bejeL5rd%FfPT;fobS;?NOW?pUH01Ok zl!ew|k|)+Z=qD)PdltEzc=6pPgfK~{Z=Jx<_XKENd6ycL7?E9a9t=vsVR_9d7WrJ_ zdrL0|?o-C9@HJt~bu4yI{PeO%#qDr3#VcplCc-^uqb;HsOi^>Bccn;vkrer~k})+< zXu#!`j`!3iGq`3p`lA^l9{=^=v8}&Zm30|rK2U9*)+9?gd=u6y$=+mBDiVS8uv*xd z+F6na*5pFS6Pn+G;8&U^y$3 zWy&lzcI|4IYqcp3RM|_}L@u1_rETU_y9iNgaXT?LvV6#mPA(_f6&$BEnK&Xou8$Hq%t9YV3x<%ZN|_W zef2(0bLfqvFqX5-=S#p9@>uIi@&*3Q2i0>IS);n1RGXM)ZbZeUP1<*|(GQhtlFwS4 zq|~{BB9cd`l4@6J>J2PA&btOy3a_xWH(l90X|^XVkKey_>5v1))muvSv7JP}Gk+_< zl~gO{peRbs41p~w@G~7NLoNrY!r3;vl>L*0&KR|6P=I#f+-?-D|3OU~-x~alpuyLlPBVQzDMT#vG-p%9^tdv8r?$SDm6{X|kH^_h9BBoSQG1V{fEHr**t0Cr72uy@e-K{K2+$ zPJoweV`l7z3Rp~^1ceB99`n4)iKUNL&I!hk-bBD9!t&iWL)z3FBcklHjsGxJFIG2j zxmcB0L@1@`_NI|lA{8DPbF)yWypu$qXR0&+O-Xg&qTfX{i zcvYWJ&X`qS0k0aHh?y7O8GG)pKaiY%1s8A^G< zzscvQS-N@Vd|rT((jyjU0XVGUu)49eq0GEQT_g_$FS(_gurM7@Q|-&)7KB~USAONV zVdSc;O$*s2ZmxTF*#92kh4*fzw|9uXmVap$jRQc8K?Mx)N=}b*kjZv)f&IV}&HnUj{LBtc-mI$++%8P)q2gymLsq#86*fy;q| zMolWRgY$-4U5bRAredWNOCm^yc<`qbE;CO;bjaZ0X0NsKa3l&0n1zY=CdJ9K0^sJ& zc$p?t7XR9@`O8Y!I3p^wJI{EccBrOsSZv-A{*cXbzqYjTcsuJUJq9C_v*G98c>!RF zx7`xN(LH!OcB|zmI0?R9X4WEF7ev4J5wr z@c;EUbu1D5B^=zt&VyA4{rf3D@KYSbi^x{oSsw6DadyYG4CddOg0B>Pmv8 zt0eV4v>Y(J;x`YnuC|gq)cd!lZ{snRhm$=;9!kQgREaXy;x{X=Zjk2&;J?8u^}3?r z_yZPRF3*|HlSIM01!*pD*Qtn4+~6&l*nFIOXC3s8j=9WiAeQOJ(xX!qHbBPdD{$@HHGB zc7xB9UQK$o+-ynSI6cMw;%56!YtAnmxe;^La)4}@+^@$fq^yUoLIb%B&%Dfxw?TUu z_5c~=&o;((+;%m>iZCMOo>WnWZBQwO$n5-O4s&FuDq{CcWQ~5+ZL1Gk3vnEv@ha7N z(H$#ty3Us=?gL@7qw05wCoj&29Z=&uKtC1qV%Kw?aK%)`>Kz;4jCV#I!4Pplcp!V@ z^Xjo9T&h;Hb&@L7MrJL4J1NWfZW#4eO8nNKG&(f`e!X@V-UF~XO8bBvt7rO+InSlX z0(b^+co@ILVPF^LC0-$s#!sF0QeCGL%9Yx9Bmo-oK%>QR=5Qx!Vs>24LLFX`zhD$@F7`SXO-0shxwahnNO_zh^MS3M0&jr z;=jiKT-c$Uhs6fzuN-Y{a9_eou3zSnj-HOk4Zze*0j>}}z8~N-vd<`W$HFYZgq7!I z&39)UC2cAkAL4)2r_*^_uiXBq1Lk{hY=!P=gSjnBZy~Zj1Es5+v4o;eJ{&+VEV&)8 zi49GbnSDIfpD{ZMBaix!R>%A(CDW``oX(WJ2z0XMIPXgf@e5h)dQvzES1%aHQ;3S)}^909^q zne&DQBIe`c{Q-<|=b#ZcmbC=dV#XJusqF7<@Rk?!=om;=7Z~x^bmy_Y*+l-S<8a&c zJ>{OhrqjLg0#68r$LNq#T}&ZHGAmzEw-S=sNK zLVr5MsK%#oEI9paObjKy!k3Brp&Xa1*0xapaQ@L$cv~<`p^SES7W5953%dO1(^=8^ z!mh7Y6vP4~8np~=xm1guu!t+!h%K<0t*Z)#_xTQZn#Lq|E>T%C&R~W5noTh}< z40yRQhu0)IA<+$S%($vS7c9L`@T&vJ{zS97K|?NBwWp<_nin$|anu#1S2%Us+U)i5 ziC+uAu-ku`jBo#U5B1a_)|3_gYe5~nW7fWo*}1@mVy$7oRIu09r)xDZ@lNzKrchm+ zG%|FoeHJ=d*@8zfvyC+}(>!_w6wR*%{ND&q<_C59MjG1UdpQs;djh*5OVOtiUwq>& zYFf}xJ)|P53t7HkKU{_OP7! z>Syd|6(qrNa&ZdNaayk>8iuSCuEJoB8N!T9#E1L(7q-Tq!67pHVt4+J_{3aGb-DCf z<=rG_zD_t{mrY$i`yxd)`hNacreSu7C|o+;dXb*fUA9q5Uz$SIq3J5jRK2&Q z&0=dcp8WfTDdu{}--#3v)QOH9;hrEmxOkJ;e8&|p-FN*QbIsPno`m8wL#Tal%IjaB z__=1Nzx?4x`6fO{jV(WQ?jUThsRZ)C!g@rim#8Z(0?oLKFlxvWr~_$L@&vtlt7$L;`MYm)Jd*dBKex&o_8_Ia2{vJVsoRr z@=nAsAkoYagt;)6dsCvz0O!8VP;_%|oJdR08idt+=>1KXGU8Zili25lF13~H)2r?lfY!4qU0xL;i(r_@=t!S7rv!|@rYB?B>S*^(KmK?(pWXcl)92cU)*{mz?e=2`nrGr$Kz}Psf75#jS_zI1Eth_DyG)53lD)$Q z2llGaYfsV7LpOu!N+>b~R4@`n&6p6=P=sMArdyEo=l46pOYJ0$n+;0@$R%rHf30yQCiIa0HwHW9_{Bqx>1Cy@|b$@TyBGE*yAiR3}AiW>Pw{+1$5& zd$q_gQWA|wNuVOqce2(?LCbd3*qNWIQ3-4eS5{yxQTg}W3 zkA}d5*28%7B?v{5A~P6T{|e^FueQrSsj)11*2{2;1ZFmCJk{-q$BL*Zd*!c$!F&4R zYu_C;yAPGi4S$32xqXhmrI}H-ehZ;T$*+%^j%5+1P literal 0 HcmV?d00001 diff --git a/media/flexPTP_test.svg b/media/flexPTP_test.svg new file mode 100644 index 0000000..2ae72c2 --- /dev/null +++ b/media/flexPTP_test.svg @@ -0,0 +1,122 @@ + + + + + + + + + + + PTP + + + + + + + + + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..95f4677 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +future==1.0.0 +iso8601==2.1.0 +netifaces==0.11.0 +netinterfaces==0.1.0 +pillow==12.2.0 +pyserial==3.5 +PyYAML==6.0.3 +ttk-text==0.3.3 +ttkthemes==3.3.0 diff --git a/start_linuxptp.sh b/start_linuxptp.sh index a6fb8d4..6f7833f 100755 --- a/start_linuxptp.sh +++ b/start_linuxptp.sh @@ -1,5 +1,5 @@ #!/bin/bash -COMMON_FLAGS=--priority1=127 --priority2=255 --gmCapable=1 --neighborPropDelayThresh=100000 --min_neighbor_prop_delay=-20000000 --assume_two_step=1 --ptp_minor_version=0 +COMMON_FLAGS="--priority1=127 --priority2=255 --gmCapable=1 --neighborPropDelayThresh=100000 --min_neighbor_prop_delay=-20000000 --assume_two_step=1 --ptp_minor_version=0" -ptp4l -i "$1" -f "linuxptp_configs/$2.cfg" -m -l 6 $COMMON_FLAGS \ No newline at end of file +sudo ptp4l -i "$1" -f "linuxptp_configs/$2.cfg" -m -l 6 $COMMON_FLAGS \ No newline at end of file diff --git a/test_main.py b/test_main.py index 0b687c8..8f2d80c 100644 --- a/test_main.py +++ b/test_main.py @@ -1,12 +1,23 @@ #!/usr/bin/python3 import optparse +import os +import time import DeviceInterface +from LinuxPtpObserver import LinuxPtpObserver +from TestController import TestController +from GUI import GUI # ---------------------------------- -usage = " [-s ]" +gui = GUI() + +gui.mainloop() + +exit(0) + +usage = " [-s ]" parser = optparse.OptionParser(usage=usage) parser.add_option("-s", dest="baudrate", default=115200) @@ -23,10 +34,33 @@ di = DeviceInterface.DeviceInterface(url=args[0], options={"baudrate": opts.baud #results = di.execute_command("osinfo") #print(results) +def echo(str: str) -> None: + print(str, end="") + +di.register_out_callback(echo) + # get device clock identity OWN_CLOCK_ID_KEY = "Own clock ID" clock_id = di.execute_command("ptp info", separate_results=True) -if type(clock_id) == dict[str, str] and OWN_CLOCK_ID_KEY in clock_id: +if type(clock_id) == dict and OWN_CLOCK_ID_KEY in clock_id: print("Own clock ID:", clock_id[OWN_CLOCK_ID_KEY]) else: - print("Could not retrieve device clock ID!") \ No newline at end of file + print("Could not retrieve device clock ID!") + +lptp_observer = LinuxPtpObserver(args[1]) +lptp_observer.register_observer_callback(echo) + +tc = TestController(di, lptp_observer) + +tc.start_test_gPTP() + +time.sleep(15) + +tc.stop_test() +tc.start_test_e2e_udp() + +time.sleep(15) + +tc.stop_test() + +di.close() \ No newline at end of file