From 3b4976155c995cdb5123b65dfb9cc60ff338be88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A1s=20Wiesner?= Date: Thu, 7 May 2026 10:42:36 +0200 Subject: [PATCH] GUI class separation --- DeviceInterface.py | 7 +- FlexPtpController.py | 132 +++++++- GUI.py | 696 ++++++++++++++++++++--------------------- LinuxPtpObserver.py | 120 ++++++- gui/GuiCommon.py | 3 + gui/LoggingTab.py | 41 +++ gui/SetupTab.py | 446 ++++++++++++++++++++++++++ gui/TerminalDisplay.py | 31 ++ 8 files changed, 1102 insertions(+), 374 deletions(-) create mode 100644 gui/GuiCommon.py create mode 100644 gui/LoggingTab.py create mode 100644 gui/SetupTab.py create mode 100644 gui/TerminalDisplay.py diff --git a/DeviceInterface.py b/DeviceInterface.py index 72e0d29..06e8ce6 100644 --- a/DeviceInterface.py +++ b/DeviceInterface.py @@ -29,10 +29,13 @@ class DeviceInterface: os.write(self.__outpipe_rw_fd[1], data.encode()) if self.__out_cb is not None: # invoke callback if provided - self.__out_cb(data) + self.__out_cb(data, "") except: break # handle closing the device + + if self.__out_cb is not None: # invoke callback if provided + self.__out_cb("", "exit") def __init__(self, url: str, options: dict = {}) -> None: @@ -141,7 +144,7 @@ class DeviceInterface: timeout = True if separate_results: # separation requested - records = re.findall("^[ ]*([^:]+)[ ]*:[ ]*(.+)[ ]*$", results.strip(), flags=re.MULTILINE) + records = re.findall(r"^[ ]*([^:]+)[ ]*:[ ]*(.+)[ ]*$", results.strip(), flags=re.MULTILINE) results = dict[str, str]() for rec in records: results[rec[0]] = rec[1].strip() diff --git a/FlexPtpController.py b/FlexPtpController.py index d90cc90..4213878 100644 --- a/FlexPtpController.py +++ b/FlexPtpController.py @@ -1,8 +1,88 @@ +import os +import queue +import re +import time +from typing import Callable + from DeviceInterface import DeviceInterface +class FlexPtpEvent: + def __init__(self, name: str, args: dict) -> None: + self.__name = name + self.__args = args + + def get_name(self) -> str: + return self.__name + + def get_args(self) -> dict: + return self.__args + + class FlexPtpController: + def __log_processor(self, data: str, spec_event: str) -> None: + if spec_event == "exit": + event = FlexPtpEvent("exit", {}) + self.__event_queue.put(event) + return + + # ------- + + stripped_data = data.strip() + line = stripped_data # strip accidental whitespaces from the start and the end of the line + match = re.search(r"^\[LOG-([A-Z:]+)\]", line) # serach for the LOGID + if match is not None and len(match.groups()) == 1: # check if LOGID was present + log_id = match.group(1) # extract LOGID + rest = line[len(match.group(0)):].strip() + + if log_id == "BMCA": # BMCA event has occurred + match = re.match(r"([A-Z_]+) -> ([A-Z_]+)", rest) + if match is not None and len(match.groups()) == 2: + event = FlexPtpEvent("bmca-statechange", { "from": match.group(1), "to": match.group(2) }) + self.__event_queue.put(event) + + elif log_id == "DEF:S:A" or log_id == "DEF:S:H": # DEFAULT SLAVE log event has occurred + if_type = log_id[-1] + if if_type == "H": # HLT interface columns + cn = 10 + else: # Addend interface columns + cn = 11 + + match = re.match(r"^" + r"([-.0-9]+)[ ]+" * (cn - 1) + r"([-.0-9]+)", rest) + + if match is not None and len(match.groups()) == cn: + args = { + "T1": float(match.group(1) + "." + match.group(2)), + "T4": float(match.group(3) + "." + match.group(4)), + "Dt": int(match.group(5)) * 1000000000 + int(match.group(6)), + "corr_ppb": float(match.group(cn - 2)), + "mpd_ns": int(match.group(cn - 1)), + "sync_period_ns": int(match.group(cn)) + } + + self.__event_queue.put(FlexPtpEvent("synclog-slave", args=args)) + + elif log_id == "DEF:M:M": # DEFAULT master : mean path delay log event has occurred + self.__event_queue.put(FlexPtpEvent("mpd-master", { "mpd_ns": int(rest) })) + + elif log_id == "DEF:M:S": # DEFAULT master : P2P slave state change log event has occurred + match = re.match(r"([A-Z_]+) -> ([A-Z_]+)", rest) + if match is not None and len(match.groups()) == 2: + event = FlexPtpEvent("p2p-slave-statechange", { "from": match.group(1), "to": match.group(2) }) + self.__event_queue.put(event) + + else: + # unhandled event + pass + + if self.__log_cb is not None: + self.__log_cb(stripped_data + "\n") + def __init__(self, di: DeviceInterface) -> None: self.__di = di + self.__di.register_out_callback(self.__log_processor) + + self.__log_cb: Callable | None = None + self.__event_queue = queue.Queue() pass def reset_flexptp(self) -> None: @@ -47,9 +127,57 @@ class FlexPtpController: self.__di.execute_command("ptp domain {:d}".format(domain)) def disable_all_logging(self) -> None: - logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ] + logging_types = [ "def", "corr", "ts", "info", "locked", "bmca", "logid" ] for lt in logging_types: self.__di.execute_command("ptp log " + lt + " off", expect_results=False) + def enable_log(self, id: str, en: bool) -> None: + if en: + onoff = "on" + else: + onoff = "off" + + self.__di.execute_command("ptp log {:s} {:s}".format(id, onoff), expect_results=False) + def set_servo_offset(self, offset: int) -> None: - self.__di.execute_command("ptp servo offset {:d}".format(offset)) \ No newline at end of file + self.__di.execute_command("ptp servo offset {:d}".format(offset)) + + def register_log_callback(self, cb: Callable | None) -> None: + self.__log_cb = cb + + def get_event(self, timeout: float) -> FlexPtpEvent: + try: + return self.__event_queue.get(timeout=timeout) + except: + return FlexPtpEvent("none", {}) + + def wait_for_event(self, expected_name: str, timeout: float, argcrits = {}) -> FlexPtpEvent: + timeout_left = timeout + start = time.time_ns() + + event = FlexPtpEvent("none", {}) + critera_met = False + while (event.get_name() != expected_name or not critera_met) and timeout_left > 0: + try: + event = self.__event_queue.get(timeout=timeout_left) # type: FlexPtpEvent + + # propagate exit event + if event.get_name() == "exit": + return event + + critera_met = True + for c_name, c_value in argcrits.items(): + args = event.get_args() + if c_name in args: + if args[c_name] != c_value: + critera_met = False + break + except: + pass + + now = time.time_ns() + timeout_left = timeout - ((now - start) / 1E+09) + + return event + + \ No newline at end of file diff --git a/GUI.py b/GUI.py index f5b36de..dbe95b0 100644 --- a/GUI.py +++ b/GUI.py @@ -1,5 +1,10 @@ import queue +import select +import sys +import threading +import time from tkinter import Tk, font, ttk +import numpy import serial import serial.tools.list_ports import ttk_text as ttkt @@ -7,344 +12,104 @@ from ttkthemes import ThemedTk import tkinter as tk import netifaces +from matplotlib.backend_bases import key_press_handler +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from matplotlib.figure import Figure + from DeviceInterface import DeviceInterface from FlexPtpController import FlexPtpController from LinuxPtpObserver import LinuxPtpObserver +from gui.LoggingTab import LoggingTab +from gui.SetupTab import SetupTab + class GUI: - def __init_flexPtp_options(self) -> None: - title = ttk.Label(self.__setup_tab, text="flexPTP options", font=self.__title_font) - frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) - frame.grid(row=0, column=0, sticky=tk.NSEW, ipady=4, padx=4) - #frame.pack(anchor="nw", fill="x", padx=12, ipady=4, side=tk.LEFT, expand=True) - - frame.rowconfigure([0, 1, 2, 3 ], weight=1) - frame.columnconfigure(0, weight=1) - frame.columnconfigure(1, weight=3) - - self.__flexPtp_options_frame = frame - - device_label = ttk.Label(frame, text="Device:") - device_label.grid(column=0, row=0, sticky=tk.E) - - baudrate_label = ttk.Label(frame, text="Baudrate:") - baudrate_label.grid(column=0, row=1, sticky=tk.E) - - baudrate_label = ttk.Label(frame, text="Parity:") - baudrate_label.grid(column=0, row=2, sticky=tk.E) - - baudrate_label = ttk.Label(frame, text="Stopbits:") - baudrate_label.grid(column=0, row=3, sticky=tk.E) - - self.__sel_device = tk.StringVar() - device_combobox = ttk.Combobox(frame, textvariable=self.__sel_device) - device_combobox["values"] = list(map(lambda p: p.device, filter(lambda p: p.subsystem == "usb", serial.tools.list_ports.comports()))) - device_combobox.current(0) - device_combobox.grid(column=1, row=0, sticky=tk.EW, padx=4) - - #device_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__sel_device) - #device_entry.grid(column=1, row=0, sticky=tk.EW, padx=4) - - self.__sel_baudrate = tk.IntVar() - baudrate_combobox = ttk.Combobox(frame, textvariable=self.__sel_baudrate) - baudrate_combobox["values"] = (115200, 57600, 38400, 19200, 9600, 4800) - baudrate_combobox.current(0) - baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) - - self.__sel_parity = tk.StringVar() - self.__sel_parity.set(serial.PARITY_NONE) - parity_frame = tk.Frame(frame) - parity_frame.grid(column=1, row=2, sticky=tk.W) - - parity_none = ttk.Radiobutton(parity_frame, text="None", variable=self.__sel_parity, value=serial.PARITY_NONE) - parity_none.pack(anchor=tk.W, side=tk.LEFT) - - parity_even = ttk.Radiobutton(parity_frame, text="Even", variable=self.__sel_parity, value=serial.PARITY_EVEN) - parity_even.pack(anchor=tk.W, side=tk.LEFT) - - parity_odd = ttk.Radiobutton(parity_frame, text="Odd", variable=self.__sel_parity, value=serial.PARITY_ODD) - parity_odd.pack(anchor=tk.W, side=tk.LEFT) - - self.__sel_stopbits = tk.IntVar() - self.__sel_stopbits.set(1) - stopbits_frame = tk.Frame(frame) - stopbits_frame.grid(column=1, row=3, sticky=tk.W) - - stopbits_one = ttk.Radiobutton(stopbits_frame, text="1", variable=self.__sel_stopbits, value=1) - stopbits_one.pack(anchor=tk.W, side=tk.LEFT) - - stopbits_two = ttk.Radiobutton(stopbits_frame, text="2", variable=self.__sel_stopbits, value=2) - stopbits_two.pack(anchor=tk.W, side=tk.LEFT) - - - def __init_linuxptp_options(self) -> None: - title = ttk.Label(self.__setup_tab, text="linuxptp options", font=self.__title_font) - frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) - frame.grid(row=1, rowspan=1, column=0, sticky=tk.NSEW, ipady=4, padx=4) - # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True) - - frame.rowconfigure([0, 1, 2], weight=1) - # frame.rowconfigure(2, weight=100) - frame.columnconfigure(0, weight=1) - frame.columnconfigure(1, weight=3) - - self.__linuxptp_options_frame = frame - - path_label = ttk.Label(frame, text="Path:") - path_label.grid(column=0, row=0, sticky=tk.E) - - if_label = ttk.Label(frame, text="Interface:") - if_label.grid(column=0, row=1, sticky=tk.E) - - arg_label = ttk.Label(frame, text="Arguments:") - arg_label.grid(column=0, row=2, sticky=tk.E) - - self.__linuxptp_path = tk.StringVar() - self.__linuxptp_path.set("/usr/sbin/ptp4l") - path_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_path) - path_entry.grid(column=1, row=0, sticky=tk.EW, padx=4) - - self.__linuxptp_interface = tk.StringVar() - baudrate_combobox = ttk.Combobox(frame, textvariable=self.__linuxptp_interface) - baudrate_combobox["values"] = netifaces.interfaces() - baudrate_combobox.current(0) - baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) - - self.__linuxptp_args = tk.StringVar() - self.__linuxptp_args.set("") - args_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_args) - args_entry.grid(column=1, row=2, sticky=tk.EW, padx=4) - - - def __init_test_cases(self) -> None: - title = ttk.Label(self.__setup_tab, text="Test cases", font=self.__title_font) - frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) - frame.grid(row=0, rowspan=2, column=1, sticky=tk.NSEW, ipady=4, padx=4) - # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True) - - self.__test_cases_frame = frame - - basic_modes_frame = ttk.LabelFrame(frame, text="Basic modes") - basic_modes_frame.rowconfigure([0], weight=2) - basic_modes_frame.rowconfigure([1, 2], weight=8) - basic_modes_frame.columnconfigure([0], weight=2) - basic_modes_frame.columnconfigure([1, 2], weight=8) - basic_modes_frame.pack(expand=False, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4) - - e2e_label = ttk.Label(basic_modes_frame, text="E2E", style="ProfileLabel.TLabel", padding=6) - e2e_label.grid(row=0, column=1) - - p2p_label = ttk.Label(basic_modes_frame, text="P2P", style="ProfileLabel.TLabel") - p2p_label.grid(row=0, column=2) - - l4_label = ttk.Label(basic_modes_frame, text="L4", style="ProfileLabel.TLabel", padding=10) - l4_label.grid(row=1, column=0) - - l2_label = ttk.Label(basic_modes_frame, text="L2", style="ProfileLabel.TLabel") - l2_label.grid(row=2, column=0) - - modes = { - "e2e_l4": { - "row": 1, - "col": 1, - }, - "e2e_l2": { - "row": 2, - "col": 1, - }, - "p2p_l4": { - "row": 1, - "col": 2, - }, - "p2p_l2": { - "row": 2, - "col": 2, - }, - } - - self.__test_cases = {} - - for name, pos in modes.items(): - oframe = ttk.Frame(basic_modes_frame, width=100, height=100, style="ProfileSquare.TFrame") - oframe.grid(row=pos["row"], column=pos["col"], sticky=tk.NSEW) - oframe.pack_propagate(False) - - iframe = tk.Frame(oframe) - iframe.place(relx=0.5, rely=0.5, anchor=tk.CENTER) - - m = tk.BooleanVar() - m_chk = ttk.Checkbutton(iframe, text="Master", variable=m, style="ProfileChkBox.TCheckbutton") - m_chk.pack(anchor=tk.CENTER, side=tk.TOP) - - s = tk.BooleanVar() - s_chk = ttk.Checkbutton(iframe, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton") - s_chk.pack(anchor=tk.CENTER, side=tk.BOTTOM) - - self.__test_cases[name] = { - "type": "general", - "delmech": name[0:3].upper(), - "layer": name[4:6].upper(), - "master": m, - "slave": s - } - - defined_profiles_frame = ttk.LabelFrame(frame, text="Defined profiles") - defined_profiles_frame.pack(expand=True, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4) - - defined_profiles = [ "gPTP" ] - - ri = 0 - for dprof in defined_profiles: - label = ttk.Label(defined_profiles_frame, text=dprof + ":", font=self.__profile_font) - label.grid(row=ri, column=0, padx=6) - - m = tk.BooleanVar() - m_chk = ttk.Checkbutton(defined_profiles_frame, text="Master", variable=m, style="ProfileChkBox.TCheckbutton") - m_chk.grid(row=ri, column=1) - - s = tk.BooleanVar() - s_chk = ttk.Checkbutton(defined_profiles_frame, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton") - s_chk.grid(row=ri, column=2) - - self.__test_cases[dprof] = { - "type": "defined", - "master": m, - "slave": s - } - - - def __init_test_controller(self) -> None: - title = ttk.Label(self.__setup_tab, text="Test controls", font=self.__title_font) - frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title) - frame.grid(row=2, column=0, columnspan=2, sticky=tk.NSEW, ipady=4, padx=4) - - self.__tests_running = False - - def start_stop_tests() -> None: - self.__start_stop_btn.configure(state="disabled") - - if not self.__tests_running: - self.start_tests() + def __start_stop_tests(self) -> str: + if not self.__tests_running: + if self.start_tests(): + self.__tests_running = True + return "running" else: - self.stop_tests() - - self.__tests_running = not self.__tests_running - - if self.__tests_running: - self.__start_stop_btn.configure(text="STOP") - state = "disabled" - else: - self.__start_stop_btn.configure(text="START") - state = "normal" - - frames = [ self.__flexPtp_options_frame, self.__linuxptp_options_frame, self.__test_cases_frame ] - - def set_widget_state(parent: tk.Widget | tk.Toplevel) -> None: - for widget in parent.winfo_children(): - if len(widget.winfo_children()) == 0 and widget.widgetName != "frame": - widget.configure(state=state) # type: ignore - else: - set_widget_state(widget) - - for frame in frames: - set_widget_state(frame) - - self.__start_stop_btn.configure(state="enabled") - - self.__start_stop_btn = ttk.Button(frame, text="START", command=start_stop_tests) - self.__start_stop_btn.pack() - - tw = ttk.Treeview(frame, columns=("name", "result", "start", "end", "duration"), selectmode="none") - tw.heading("#0", text="#") - tw.heading("name", text="Name") - tw.heading("result", text="Result") - tw.heading("start", text="Start") - tw.heading("end", text="End") - tw.heading("duration", text="Duration") - - tw.tag_configure("passed", background="LawnGreen") - tw.tag_configure("failed", background="Salmon") - tw.tag_configure("in_progress", background="Gold") - tw.tag_configure("pending", font=("TkDefaultFont", 9, "italic"), foreground="Gray") - - tw.pack() - - self.__test_tw = tw + return "stopped" + else: + self.stop_tests() + self.__tests_running = False + return "stopped" def __init_setup_tab(self) -> None: - self.__setup_tab = ttk.Frame(self.__tabs) - self.__setup_tab.columnconfigure(0, weight=1) - self.__setup_tab.columnconfigure(1, weight=4) - self.__setup_tab.rowconfigure([0, 1], weight=1) - self.__setup_tab.rowconfigure(2, weight=100) - self.__tabs.add(self.__setup_tab, text="Setup") + """ + Initialize "setup" tab widgets. + """ - self.__init_flexPtp_options() - self.__init_linuxptp_options() - self.__init_test_cases() - self.__init_test_controller() + self.__setup_tab = SetupTab(self.__tabs) + self.__setup_tab.register_start_stop_tests_cb(self.__start_stop_tests) + + self.__tabs.add(self.__setup_tab, text="Setup") def __init_logtab(self) -> None: - self.__logtab = ttk.Frame(self.__tabs) - self.__tabs.add(self.__logtab, text="Logs") + """ + Initialize log tab. + """ - self.__logtab.rowconfigure(0, weight=8) - self.__logtab.rowconfigure(1, weight=2) - - self.__logtab.columnconfigure(0, weight=1) - self.__logtab.columnconfigure(1, weight=1) - - terminal_settings = { "wrap": tk.WORD, "background": "#3D3D3D", "borderwidth": 0, "foreground": "white", "blockcursor": True, "insertbackground": "white"} - self.__flexPtp_log = tk.Text(self.__logtab, **terminal_settings) - self.__flexPtp_log.grid(column=0, row=0, sticky=tk.NSEW) - self.__flexPtp_log.configure(font=self.__console_font, state="disabled") - - self.__linuxptp_log = tk.Text(self.__logtab, **terminal_settings) - self.__linuxptp_log.grid(column=1, row=0, sticky=tk.NSEW, pady=0) - self.__linuxptp_log.configure(font=self.__console_font, state="disabled") + # initialize tab and its widgets + self.__logtab = LoggingTab(self.__tabs) + # initialize messaging queues self.__flexPtp_log_queue = queue.Queue() self.__linuxptp_log_queue = queue.Queue() - def __init_test_results(self) -> None: + def __init_results_tab(self) -> None: + """ + Initialize "test results" tab. + """ + self.__results_tabs = ttk.Frame(self.__tabs) self.__tabs.add(self.__results_tabs, text="Results") + fig = Figure(figsize=(5, 4), dpi=100) + t = numpy.arange(0, 3, .01) + ax = fig.add_subplot() + line, = ax.plot(t, 2 * numpy.sin(2 * numpy.pi * t)) + ax.set_xlabel("time [s]") + ax.set_ylabel("f(t)") + + canvas = FigureCanvasTkAgg(fig, master=self.__results_tabs) # A tk.DrawingArea. + canvas.draw() + + canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) def __init_tabs(self) -> None: + """ + Initialize tabs. + """ + self.__init_setup_tab() self.__init_logtab() - self.__init_test_results() + self.__init_results_tab() + def __init_statusbar(self) -> None: + self.__status = tk.StringVar(value="Idle") + statusbar = ttk.Label(self.__win, textvariable=self.__status) + statusbar.pack(anchor=tk.S, side=tk.BOTTOM, fill=tk.X) - def __init_fonts(self) -> None: - self.__console_font = font.Font(family="Ubuntu Mono", size=9) - self.__title_font=font.Font(family="Ubuntu", size=14) - self.__mode_font=font.Font(family="Ubuntu", size=12, weight="bold") - self.__profile_font=font.Font(family="Ubuntu", size=12, slant="italic") - - - def __init_styles(self) -> None: - style = ttk.Style() - style.configure("ProfileChkBox.TCheckbutton", font=self.__console_font) - style.configure("ProfileSquare.TFrame", bordercolor="#003039", borderwidth=1, relief="solid") - style.configure("ProfileLabel.TLabel", font=self.__mode_font) + def set_status(self, status: str) -> None: + self.__status.set(status) def __init_print_polls(self) -> None: + """ + Initialize polled log printing. + """ + def print_logs() -> None: while not self.__flexPtp_log_queue.empty(): - self.__flexPtp_log.configure(state="normal") - self.__flexPtp_log.insert(tk.END, self.__flexPtp_log_queue.get()) - self.__flexPtp_log.configure(state="disabled") + self.__logtab.add_log("flexPTP", self.__flexPtp_log_queue.get()) while not self.__linuxptp_log_queue.empty(): - self.__linuxptp_log.configure(state="normal") - self.__linuxptp_log.insert(tk.END, self.__linuxptp_log_queue.get()) - self.__linuxptp_log.configure(state="disabled") + self.__logtab.add_log("linuxptp", self.__linuxptp_log_queue.get()) self.__win.after(50, print_logs) @@ -353,54 +118,65 @@ class GUI: def __init__(self) -> None: self.__win = ThemedTk(theme="arc") + self.__win.minsize(1000, 700) #self.__win.configure() self.__win.title("flexPTP test suite") self.__win.iconphoto(False, tk.PhotoImage(file="media/flexPTP_test.png")) - self.__init_fonts() - self.__init_styles() - self.__tabs = ttk.Notebook(self.__win) - self.__tabs.configure(width=1000, height=600) + self.__tabs.configure() self.__init_tabs() + self.__tabs.pack(expand=True, anchor=tk.S, side=tk.TOP, fill=tk.BOTH) - self.__tabs.pack(expand=True, fill="both") + self.__init_statusbar() self.__init_print_polls() + self.__tests_running = False + self.__user_stop = False + def mainloop(self) -> None: self.__win.mainloop() def __gather_tests(self) -> None: - # gather test cases - self.__tests = [] - for name, case in self.__test_cases.items(): - ptype = case["type"] + """ + Gather user-selected test cases. + """ + + self.__tests = [] + for name, case in self.__setup_tab.get_test_cases().items(): + ptype = case["type"] - template = {} - if ptype == "general": - template = { "type": ptype, "name": case["delmech"] + " " + case["layer"], "delmech": case["delmech"], "layer": case["layer"] } - elif ptype == "defined": - template = { "type": ptype, "name": name } + template = {} + if ptype == "general": + id = case["delmech"] + "_" + case["layer"] + name = case["delmech"] + " " + case["layer"] + template = { "type": ptype, "name": name, "id": id, "delmech": case["delmech"], "layer": case["layer"] } + elif ptype == "defined": + template = { "type": ptype, "name": name, "id": name } - if case["master"].get(): - master_mode = dict(template) - master_mode["mode"] = "master" - master_mode["name"] += " (master)" - self.__tests.append(master_mode) - - if case["slave"].get(): - slave_mode = dict(template) - slave_mode["mode"] = "slave" - slave_mode["name"] += " (slave)" - self.__tests.append(slave_mode) + if case["master"].get(): + master_mode = dict(template) + master_mode["mode"] = "master" + master_mode["name"] += " (master)" + self.__tests.append(master_mode) + + if case["slave"].get(): + slave_mode = dict(template) + slave_mode["mode"] = "slave" + slave_mode["name"] += " (slave)" + self.__tests.append(slave_mode) def __populate_tests_treeview(self) -> None: - tw = self.__test_tw + """ + Fill the test treeview. + """ + + tw = self.__setup_tab.get_test_treeview() items = tw.get_children() if items != (): tw.delete(*items) @@ -413,71 +189,267 @@ class GUI: def __open_flexPtp_interface(self) -> None: - url = self.__sel_device.get() + """ + Open the flexPTP device interface. + """ + + flexPtp_settings = self.__setup_tab.get_settings()["flexPTP"] + + device = flexPtp_settings["device"] opts = { - "baudrate": self.__sel_baudrate.get(), - "parity": self.__sel_parity.get(), - "stopbits": self.__sel_stopbits.get() + "baudrate": flexPtp_settings["baudrate"], + "parity": flexPtp_settings["parity"], + "stopbits": flexPtp_settings["stopbits"] } - def echo(data: str) -> None: - self.__flexPtp_log_queue.put(data.replace("\r", "")) - - self.__device_interface = DeviceInterface(url, opts) - self.__device_interface.register_out_callback(echo) + self.__device_interface = DeviceInterface(device, opts) self.__flexPtp_controller = FlexPtpController(self.__device_interface) def __init_linuxptp_observer(self) -> None: - self.__linuxptp_observer = LinuxPtpObserver(self.__linuxptp_interface.get()) - - def echo(data) -> None: - self.__linuxptp_log_queue.put(str(data)) - - self.__linuxptp_observer.register_observer_callback(echo) + """ + Initialize the linuxptp observer. + """ + + linuxptp_settings = self.__setup_tab.get_settings()["linuxptp"] + self.__linuxptp_observer = LinuxPtpObserver(linuxptp_settings["interface"]) def __init_flexPtp(self) -> None: + """ + Initialize flexPTP. + """ + ctrl = self.__flexPtp_controller ctrl.disable_all_logging() ctrl.set_domain(0) ctrl.reset_flexptp() + ctrl.enable_log("logid", True) + def test_routine(self) -> None: + LINUXPTP_INIT_TIMEOUT = 5 + BMCA_TIMEOUT = 150 + SYNC_TIMEOUT = 4 + SYNC_MAX_CYCLES = 100 + TARGET_RANGE = 50**2 + WINDOW_SIZE = 10 - def start_tests(self) -> None: - self.__gather_tests() - self.__populate_tests_treeview() - self.__open_flexPtp_interface() - self.__init_flexPtp() - self.__init_linuxptp_observer() + def flexPtp_echo(data: str) -> None: + self.__flexPtp_log_queue.put(str(data)) - # ---- + def linuxptp_echo(data: str) -> None: + self.__linuxptp_log_queue.put(str(data)) + + def both_echo(data: str) -> None: + flexPtp_echo(data) + linuxptp_echo(data) + + self.__flexPtp_controller.register_log_callback(flexPtp_echo) + self.__linuxptp_observer.register_observer_callback(linuxptp_echo) + + def time_to_str(t: float) -> str: + return time.strftime("%d-%m-%Y %H:%M:%S", time.gmtime(t)) ctrl = self.__flexPtp_controller observer = self.__linuxptp_observer - for test in self.__tests: - if test["type"] == "general": - id = test["delmech"] + "_" + test["layer"] - else: - id = test["name"] + tests_errored = False + + tsn = 1 + for test in self.__tests: + # identify correct mode or profile + id = test["id"] + mode = test["mode"] + + # set status + self.set_status("[{:d}/{:d}] Test in progress: {:s}".format(tsn, len(self.__tests), test["name"])) + + # setup info variables for the test + test["start"] = time.time() + str_start_time = time_to_str(test["start"]) + values = (test["name"], "In-progress", str_start_time, "-", "-") + self.__setup_tab.get_test_treeview().item(test["entry"], values=values, tags=("in_progress")) + + # announce the start of a new testcase + both_echo("\n\n----------\nSTARTING test: {:s} ({:s})\n start time: {:s}\n----------\n\n".format(id, mode, str_start_time)) + + # set mode or profile on the flexPTP device ctrl.start_by_id(id) + # adjust device priority to force required working mode priority1 = 128 - if test["mode"] == "master": + if mode == "master": priority1 = 100 ctrl.set_priority(priority1, 255) + # start linuxptp with the matching mode or profile observer.start_linuxptp(id) - + n = 0 + var = 1E+09 + dts = [] + try: + # wait for the LISTENING state clearly indicating the startup of the linuxptp + event = observer.wait_for_event("bmca-statechange", LINUXPTP_INIT_TIMEOUT, { "to": "LISTENING" }) + if event.get_name() != "bmca-statechange": + raise Exception("linuxptp init failed") - for test in self.__tests: - pass + # turn on BMCA and default logging on the flexPTP device + ctrl.enable_log("bmca", True) + ctrl.enable_log("def", True) + + # assign flexPTP and linuxptp sides + if mode == "slave": + master_side = observer + slave_side = ctrl + observer_bmca_role = "MASTER" + ctrl_bmca_role = "SLAVE" + else: + master_side = ctrl + slave_side = observer + observer_bmca_role = "SLAVE" + ctrl_bmca_role = "MASTER" + + + # wait for the correct BMCA states to settle in + error = False + error_cause = "none" + + event = ctrl.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": ctrl_bmca_role }) # flexPTP + if event.get_name() != "bmca-statechange": + raise Exception(event.get_name()) + + event = observer.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": observer_bmca_role }) # linuxptp + if event.get_name() != "bmca-statechange": + raise Exception(event.get_name()) + + # wait for the synchronization to settle + while n < SYNC_MAX_CYCLES and var > TARGET_RANGE: + event = slave_side.get_event(SYNC_TIMEOUT) + + # a state change with the BMCA indicates a clear error + if event.get_name() == "synclog-slave": + dts.append(event.get_args()["Dt"]) + elif event.get_name() in ("bmca-statechange", "exit"): + raise Exception(event.get_name()) + + if n >= WINDOW_SIZE: + var = numpy.var(dts[n-10:n]) + + n += 1 + + # check variance + error = not var < TARGET_RANGE + + except Exception as e: + error = True + error_cause = e.args[0] + + # display linuxptp errors + linuxptp_errors = observer.get_errors() + if linuxptp_errors != "": + linuxptp_errors = "ERROR:\n" + linuxptp_errors + linuxptp_echo(linuxptp_errors) + + # stop linuxptp + observer.stop_linuxptp() + + # exit if some of the subordinate layers signaled an exit + if error and error_cause in ("exit", "linuxptp init failed"): + self.set_status("Tests stopped") + tests_errored = True + + # save results, conclusion time and calculate duration + test["dts"] = dts + test["cycles"] = n + + passed = not error + if passed: + test["result"] = "passed" + else: + if tests_errored: + test["result"] = "errored" + else: + test["result"] = "failed" + + # save timestamps + test["end"] = time.time() + test["duration"] = test["end"] - test["start"] + + str_end_time = time_to_str(test["end"]) + str_duration = time.strftime("%H:%M:%S", time.gmtime(test["duration"])) + + self.__setup_tab.get_test_treeview().item(test["entry"], values=( + test["name"], test["result"].upper(), + str_start_time, + str_end_time, + str_duration + ), tags=(test["result"])) + + # announce the end of the test + both_echo("\n\n----------\nCONCLUDING test: {:s} ({:s})\n result: {:s}\n start time: {:s}\n stop time: {:s}\n duration: {:s}\n----------\n\n".format( + id, mode, test["result"].upper(), str_start_time, str_end_time, str_duration)) + + if tests_errored: + break + + tsn += 1 + + if not self.__user_stop: + self.stop_tests() + + if not tests_errored: + self.set_status("Tests concluded") + + self.__user_stop = False + + + def start_tests(self) -> bool: + """ + Start selected tests. + """ + + self.__gather_tests() + + if len(self.__tests) == 0: + self.set_status("No tests have been selected") + return False + + self.__populate_tests_treeview() + + self.set_status("Preparing flexPTP device") + + self.__open_flexPtp_interface() + self.__init_flexPtp() + + self.set_status("Starting linuxptp observer") + + self.__init_linuxptp_observer() + + # ---- + + self.set_status("Starting tests") + + self.__test_thread = threading.Thread(target=self.test_routine) + self.__test_thread.start() + + self.__setup_tab.set_test_state("running") + + return True - def stop_tests(self) -> None: + def stop_tests(self) -> bool: + """ + Stop tests in progress. + """ + + self.__user_stop = True self.__linuxptp_observer.stop_linuxptp() - self.__device_interface.close() \ No newline at end of file + self.__device_interface.close() + + self.__setup_tab.set_test_state("stopped") + + return True + \ No newline at end of file diff --git a/LinuxPtpObserver.py b/LinuxPtpObserver.py index 585b2a6..749d6c6 100644 --- a/LinuxPtpObserver.py +++ b/LinuxPtpObserver.py @@ -1,42 +1,101 @@ +import queue import signal import subprocess import select from threading import Thread +import time from typing import Callable +import re + +class LinuxPtpEvent: + def __init__(self, name: str, args: dict) -> None: + self.__name = name + self.__args = args + + def get_name(self) -> str: + return self.__name + + def get_args(self) -> dict: + return self.__args class LinuxPtpObserver: + OFFSET_REGEX = r"ptp4l\[[0-9.]+\]: master offset[ ]+([-\d]+) s\d freq[ ]+([-\d]+) path delay[ ]+([\d]+)" + BMCA_REGEX = r"ptp4l\[[0-9.]+\]: port \d+ \([a-z0-9]+\): ([A-Z_]+) to ([A-Z_]+) on [A-Z_]+" + def __init__(self, ni: str) -> None: self.__ni = ni self.__process: subprocess.Popen self.__observer_thread: Thread self.__observer_cb: Callable | None = None + self.__event_queue = queue.Queue() + self.__error_queue = queue.Queue() pass def observer_routine(self) -> None: + pfd = select.poll() if self.__process.stdout is not None: - pfd = select.poll() pfd.register(self.__process.stdout.fileno(), select.POLLIN) else: return - + + if self.__process.stderr is not None: + pfd.register(self.__process.stderr.fileno(), select.POLLIN) + else: + return + while self.__process.poll() is None: res = pfd.poll() - if len(res) > 0 and res[0][1] & select.POLLIN: - data = self.__process.stdout.readline() - if self.__observer_cb is not None: - self.__observer_cb(data) + for fde in res: + if fde[0] == self.__process.stdout.fileno() and fde[1] & select.POLLIN: # STDOUT + orig_data = self.__process.stdout.readline() + data = orig_data.strip() + + # SYNCLOG lines + match = re.match(LinuxPtpObserver.OFFSET_REGEX, data) + if match is not None: + args = { + "Dt": int(match.group(1)), + "mpd_ns": int(match.group(3)) + } + event = LinuxPtpEvent("synclog-slave", args) + self.__event_queue.put(event) + + # BMCA state change lines + match = re.match(LinuxPtpObserver.BMCA_REGEX, data) + if match is not None: + args = { + "from": match.group(1), + "to": match.group(2) + } + event = LinuxPtpEvent("bmca-statechange", args) + self.__event_queue.put(event) + + + if self.__observer_cb is not None: + self.__observer_cb(orig_data) + + elif fde[0] == self.__process.stderr.fileno() and fde[1] & select.POLLIN: # STDERR + text = self.__process.stderr.read() + self.__error_queue.put(text) + event = LinuxPtpEvent("error", { "text": text }) + self.__event_queue.put(event) + + # thread has returned + event = LinuxPtpEvent("exit", {}) + self.__event_queue.put(event) def start_linuxptp(self, profile: str) -> None: cmd = ["sudo", "ptp4l", "--priority1=127", "--priority2=255", "--gmCapable=1", "--neighborPropDelayThresh=100000", - "--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0", + "--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0", "--summary_interval=-5", "-i", self.__ni, "-f", "linuxptp_configs/{:s}.cfg".format(profile), "-m", "-l", "6"] self.__process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, + stderr=subprocess.PIPE, text=True ) @@ -46,6 +105,51 @@ class LinuxPtpObserver: def stop_linuxptp(self) -> None: self.__process.send_signal(signal.SIGKILL) self.__observer_thread.join() + self.__event_queue = queue.Queue() def register_observer_callback(self, cb : Callable) -> None: - self.__observer_cb = cb \ No newline at end of file + self.__observer_cb = cb + + def get_event(self, timeout: float) -> LinuxPtpEvent: + try: + return self.__event_queue.get(timeout=timeout) + except: + return LinuxPtpEvent("none", {}) + + def wait_for_event(self, expected_name: str, timeout: float, argcrits = {}) -> LinuxPtpEvent: + timeout_left = timeout + start = time.time_ns() + + event = LinuxPtpEvent("none", {}) + critera_met = False + while (event.get_name() != expected_name or not critera_met) and timeout_left > 0: + try: + event = self.__event_queue.get(timeout=timeout_left) # type: LinuxPtpEvent + + # propagate exit event + if event.get_name() in ("exit", "error"): + return event + + critera_met = True + for c_name, c_value in argcrits.items(): + args = event.get_args() + if c_name in args: + if args[c_name] != c_value: + critera_met = False + break + except: + pass + + now = time.time_ns() + timeout_left = timeout - ((now - start) / 1E+09) + + return event + + def get_errors(self) -> str: + error = "" + while not self.__error_queue.empty(): + error += self.__error_queue.get() + + return error + + \ No newline at end of file diff --git a/gui/GuiCommon.py b/gui/GuiCommon.py new file mode 100644 index 0000000..ade2679 --- /dev/null +++ b/gui/GuiCommon.py @@ -0,0 +1,3 @@ +TITLE_FONT = ("Ubuntu", 14) # frame title font +TERMINAL_FONT = ("Ubuntu Mono", 9) # terminal font +MODE_FONT = ("Ubuntu", 12, "bold") # mode font diff --git a/gui/LoggingTab.py b/gui/LoggingTab.py new file mode 100644 index 0000000..bdacd68 --- /dev/null +++ b/gui/LoggingTab.py @@ -0,0 +1,41 @@ +import tkinter as tk +from tkinter import ttk +from typing import Literal + +from gui.TerminalDisplay import TerminalDisplay + +class LoggingTab(ttk.Frame): + def __init_layout(self) -> None: + self.rowconfigure(0, weight=8) + self.rowconfigure(1, weight=2) + + self.columnconfigure(0, weight=1) + self.columnconfigure(1, weight=1) + + def __init_widgets(self) -> None: + self.__flexPtp_log = TerminalDisplay(self) + self.__flexPtp_log.grid(column=0, row=0, sticky=tk.NSEW) + + self.__linuxptp_log = TerminalDisplay(self) + self.__linuxptp_log.grid(column=1, row=0, sticky=tk.NSEW) + + def __init__(self, notebook: ttk.Notebook) -> None: + super().__init__(master=notebook) + notebook.add(self, text="Logs") + + self.__init_layout() # initialize widget layout + self.__init_widgets() # initialize widgets + + def clear_log(self, which: Literal["flexPTP", "linuxptp", "both"]) -> None: + if which in ("flexPTP", "both"): + self.__flexPtp_log.delete(1, tk.END) + + if which in ("linuxptp", "both"): + self.__linuxptp_log.delete(1, tk.END) + + def add_log(self, which: Literal["flexPTP", "linuxptp", "both"], what: str) -> None: + if which in ("flexPTP", "both"): + self.__flexPtp_log.insert(tk.END, what) + + if which in ("linuxptp", "both"): + self.__linuxptp_log.insert(tk.END, what) \ No newline at end of file diff --git a/gui/SetupTab.py b/gui/SetupTab.py new file mode 100644 index 0000000..0fe4e35 --- /dev/null +++ b/gui/SetupTab.py @@ -0,0 +1,446 @@ +from collections.abc import Callable +import tkinter as tk +from tkinter import ttk +from typing import Callable, Literal + +import serial.tools.list_ports +import netifaces + +import gui.GuiCommon as gcm + +class FlexPtpOptionsPanel(ttk.LabelFrame): + def __init_layout(self) -> None: + self.rowconfigure([0, 1, 2, 3 ], weight=1) + self.columnconfigure(0, weight=1) + self.columnconfigure(1, weight=3) + + def __init_widgets(self) -> None: + # initialize labels + device_label = ttk.Label(self, text="Device:") # Serial device entry + device_label.grid(column=0, row=0, sticky=tk.E) + + baudrate_label = ttk.Label(self, text="Baudrate:") # Baudrate entry + baudrate_label.grid(column=0, row=1, sticky=tk.E) + + baudrate_label = ttk.Label(self, text="Parity:") # Parity selector line + baudrate_label.grid(column=0, row=2, sticky=tk.E) + + baudrate_label = ttk.Label(self, text="Stopbits:") # Number of stopbits + baudrate_label.grid(column=0, row=3, sticky=tk.E) + + # initialize value selectors + + # Serial device + self.__sel_device = tk.StringVar() # device var + device_combobox = ttk.Combobox(self, textvariable=self.__sel_device) + device_combobox["values"] = list(map(lambda p: p.device, filter(lambda p: p.subsystem == "usb", serial.tools.list_ports.comports()))) # list of possible devices + device_combobox.current(0) + device_combobox.grid(column=1, row=0, sticky=tk.EW, padx=4) + + # Baudrate (default: 115200) + self.__sel_baudrate = tk.IntVar() # baudrate var + baudrate_combobox = ttk.Combobox(self, textvariable=self.__sel_baudrate) + baudrate_combobox["values"] = (115200, 57600, 38400, 19200, 9600, 4800) # possible baudrates + baudrate_combobox.current(0) + baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) + + # Parity (default: NONE) + self.__sel_parity = tk.StringVar() # parity var + self.__sel_parity.set(serial.PARITY_NONE) + parity_frame = tk.Frame(self) + parity_frame.grid(column=1, row=2, sticky=tk.W) + + parity_none = ttk.Radiobutton(parity_frame, text="None", variable=self.__sel_parity, value=serial.PARITY_NONE) # no parity + parity_none.pack(anchor=tk.W, side=tk.LEFT) + + parity_even = ttk.Radiobutton(parity_frame, text="Even", variable=self.__sel_parity, value=serial.PARITY_EVEN) # even parity + parity_even.pack(anchor=tk.W, side=tk.LEFT) + + parity_odd = ttk.Radiobutton(parity_frame, text="Odd", variable=self.__sel_parity, value=serial.PARITY_ODD) # odd parity + parity_odd.pack(anchor=tk.W, side=tk.LEFT) + + # Stopbits (default: 1) + self.__sel_stopbits = tk.IntVar() + self.__sel_stopbits.set(1) + stopbits_frame = tk.Frame(self) + stopbits_frame.grid(column=1, row=3, sticky=tk.W) + + stopbits_one = ttk.Radiobutton(stopbits_frame, text="1", variable=self.__sel_stopbits, value=1) + stopbits_one.pack(anchor=tk.W, side=tk.LEFT) + + stopbits_two = ttk.Radiobutton(stopbits_frame, text="2", variable=self.__sel_stopbits, value=2) + stopbits_two.pack(anchor=tk.W, side=tk.LEFT) + + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + self.configure(labelwidget=ttk.Label(self, text="flexPTP options", font=gcm.TITLE_FONT)) + + self.__init_layout() # initialize panel's inner layout + self.__init_widgets() # initialize widgets + + def get_settings(self) -> dict: + return { + "device": self.__sel_device.get(), + "baudrate": self.__sel_baudrate.get(), + "parity": self.__sel_parity.get(), + "stopbits": self.__sel_stopbits.get() + } + + +class LinuxPtpOptionsPanel(ttk.LabelFrame): + def __init_layout(self) -> None: + self.rowconfigure([0, 1, 2], weight=1) + # self.rowconfigure(2, weight=100) + self.columnconfigure(0, weight=1) + self.columnconfigure(1, weight=3) + + def __init_widgets(self) -> None: + # initialize labels + path_label = ttk.Label(self, text="Path:") # linuxptp binary path + path_label.grid(column=0, row=0, sticky=tk.E) + + if_label = ttk.Label(self, text="Interface:") # network interface + if_label.grid(column=0, row=1, sticky=tk.E) + + arg_label = ttk.Label(self, text="Arguments:") # additional arguments passed to the linuxptp + arg_label.grid(column=0, row=2, sticky=tk.E) + + # initialize entry widgets + + # linuxptp binary path entry (default: /usr/sbin/ptp4l) + self.__linuxptp_path = tk.StringVar() # path var + self.__linuxptp_path.set("/usr/sbin/ptp4l") + path_entry = ttk.Entry(self, font=gcm.TERMINAL_FONT, textvariable=self.__linuxptp_path) + path_entry.grid(column=1, row=0, sticky=tk.EW, padx=4) + + # linuxptp interface selector (default: first interface in list) + self.__linuxptp_interface = tk.StringVar() # interface var + baudrate_combobox = ttk.Combobox(self, textvariable=self.__linuxptp_interface) + baudrate_combobox["values"] = netifaces.interfaces() # list of available interfaces + baudrate_combobox.current(0) + baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4) + + # linuxptp additional argument entry (default: ) + self.__linuxptp_args = tk.StringVar() # argument var + self.__linuxptp_args.set("") + args_entry = ttk.Entry(self, font=gcm.TERMINAL_FONT, textvariable=self.__linuxptp_args) + args_entry.grid(column=1, row=2, sticky=tk.EW, padx=4) + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + self.configure(labelwidget=ttk.Label(self, text="linuxptp options", font=gcm.TITLE_FONT)) + + self.__init_layout() # initialize layout + self.__init_widgets() # initialize widgets + + def get_settings(self) -> dict: + return { + "path": self.__linuxptp_path.get(), + "interface": self.__linuxptp_interface.get(), + "arguments": self.__linuxptp_args.get() + } + + +class BasicTestCasesPanel(ttk.LabelFrame): + # test modes and grid positions + MODES = { + "e2e_l4": { + "row": 1, + "col": 1, + }, + "e2e_l2": { + "row": 2, + "col": 1, + }, + "p2p_l4": { + "row": 1, + "col": 2, + }, + "p2p_l2": { + "row": 2, + "col": 2, + }, + } + + def __init_styles(self) -> None: + style = ttk.Style() + style.configure("BasicTestChkBox.TCheckbutton", font=gcm.TERMINAL_FONT) + style.configure("BasicTestSquare.TFrame", bordercolor="#003039", borderwidth=1, relief="solid") + style.configure("BasicTestLabel.TLabel", font=gcm.MODE_FONT) + + def __init_layout(self) -> None: + self.rowconfigure([0], weight=2) + self.rowconfigure([1, 2], weight=8) + self.columnconfigure([0], weight=2) + self.columnconfigure([1, 2], weight=8) + + def __init_testcase_grid(self) -> None: + # initialize labels + e2e_label = ttk.Label(self, text="E2E", style="BasicTestLabel.TLabel", padding=6) + e2e_label.grid(row=0, column=1) + + p2p_label = ttk.Label(self, text="P2P", style="BasicTestLabel.TLabel") + p2p_label.grid(row=0, column=2) + + l4_label = ttk.Label(self, text="L4", style="BasicTestLabel.TLabel", padding=10) + l4_label.grid(row=1, column=0) + + l2_label = ttk.Label(self, text="L2", style="BasicTestLabel.TLabel") + l2_label.grid(row=2, column=0) + + # initialize mode selectors for all test cases + self.__test_cases = {} + for name, pos in self.MODES.items(): + # cell frame containing all checkboxes + oframe = ttk.Frame(self, width=100, height=100, style="BasicTestSquare.TFrame") + oframe.grid(row=pos["row"], column=pos["col"], sticky=tk.NSEW) # place to their designated grid position + oframe.pack_propagate(False) + + # internal frame + iframe = tk.Frame(oframe) + iframe.place(relx=0.5, rely=0.5, anchor=tk.CENTER) + + # "Master" checkbox + m = tk.BooleanVar() + m_chk = ttk.Checkbutton(iframe, text="Master", variable=m, style="BasicTestChkBox.TCheckbutton") + m_chk.pack(anchor=tk.CENTER, side=tk.TOP) + + # "Slave" checkbox + s = tk.BooleanVar() + s_chk = ttk.Checkbutton(iframe, text="Slave ", variable=s, style="BasicTestChkBox.TCheckbutton") + s_chk.pack(anchor=tk.CENTER, side=tk.BOTTOM) + + # store test case + self.__test_cases[name] = { + "type": "general", + "delmech": name[0:3].upper(), + "layer": name[4:6].upper(), + "master": m, + "slave": s + } + + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master, text="Basic modes") + + self.__init_styles() # initialize widget styles + self.__init_layout() # initialize the layout + self.__init_testcase_grid() # initialize the grid widgets + + def get_test_cases(self) -> dict: + return self.__test_cases + + +class DefinedTestCasesPanel(ttk.LabelFrame): + DEFINED_PROFILES = [ "gPTP" ] + PROFILE_FONT = ("Ubuntu", 12, "italic") + + def __init_styles(self) -> None: + style = ttk.Style() + style.configure("DefinedTestChkBox.TCheckbutton", font=gcm.TERMINAL_FONT) + + def __init_widgets(self) -> None: + self.__test_cases = {} + ri = 0 + for dprof in self.DEFINED_PROFILES: + # test case label + label = ttk.Label(self, text=dprof + ":", font=self.PROFILE_FONT) + label.grid(row=ri, column=0, padx=6) + + # "Master" checkbox + m = tk.BooleanVar() + m_chk = ttk.Checkbutton(self, text="Master", variable=m, style="DefinedTestChkBox.TCheckbutton") + m_chk.grid(row=ri, column=1) + + # "Slave" checkbox + s = tk.BooleanVar() + s_chk = ttk.Checkbutton(self, text="Slave ", variable=s, style="DefinedTestChkBox.TCheckbutton") + s_chk.grid(row=ri, column=2) + + # store test case + self.__test_cases[dprof] = { + "type": "defined", + "master": m, + "slave": s + } + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master, text="Defined profiles") + + self.__init_styles() + self.__init_widgets() + + def get_test_cases(self) -> dict: + return self.__test_cases + + +class TestCasePanel(ttk.LabelFrame): + def __init_layout(self) -> None: + pass + + def __init_widgets(self) -> None: + self.__basic_test_panel = BasicTestCasesPanel(self) + self.__basic_test_panel.pack(expand=False, fill=tk.BOTH, anchor=tk.W, side=tk.LEFT, padx=4, pady=4) + + self.__defined_test_panel = DefinedTestCasesPanel(self) + self.__defined_test_panel.pack(expand=True, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4) + + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + self.configure(labelwidget= ttk.Label(self, text="Test cases", font=gcm.TITLE_FONT)) + + self.__init_layout() + self.__init_widgets() + + def get_test_cases(self) -> dict: + return dict(self.__basic_test_panel.get_test_cases()) | dict(self.__defined_test_panel.get_test_cases()) + + +class TestManagerMonitorPanel(ttk.LabelFrame): + def __init_layout(self) -> None: + pass + + def __init_widgets(self) -> None: + self.__start_stop_btn = ttk.Button(self, text="START", command=self.__start_stop_tests) + self.__start_stop_btn.pack() + + tw = ttk.Treeview(self, columns=("name", "result", "start", "end", "duration"), selectmode="none") + tw.heading("#0", text="#") + tw.heading("name", text="Name") + tw.heading("result", text="Result") + tw.heading("start", text="Start") + tw.heading("end", text="End") + tw.heading("duration", text="Duration") + + tw.tag_configure("passed", background="LawnGreen") + tw.tag_configure("failed", background="Salmon") + tw.tag_configure("errored", background="firebrick4", foreground="white") + tw.tag_configure("in_progress", background="Gold") + tw.tag_configure("pending", font=("TkDefaultFont", 9, "italic"), foreground="Gray") + + tw.pack(expand=True, fill="x") + + self.__test_tw = tw + + def __init__(self, master: tk.Misc) -> None: + super().__init__(master) + self.configure(labelwidget=ttk.Label(self, text="Test controls", font=gcm.TITLE_FONT)) + + self.__start_stop_tests_cb: Callable[..., Literal["stopped", "running"]] | None = None + + self.__init_layout() + self.__init_widgets() + + def __start_stop_tests(self) -> None: + # momentarily disable the start/stop button + self.__start_stop_btn.configure(state="disabled") + + # invoke start/stop callback + test_state: Literal["stopped", "running"] = "stopped" + if self.__start_stop_tests_cb is not None: + test_state = self.__start_stop_tests_cb() + + # set button state according to the test state (running/stopped) + self.set_test_state(test_state) + + # re-enable start/stop button + self.__start_stop_btn.configure(state="enabled") + + def register_start_stop_tests_cb(self, start_stop_tests_cb: Callable) -> None: + self.__start_stop_tests_cb = start_stop_tests_cb + + def get_test_treeview(self) -> ttk.Treeview: + return self.__test_tw + + def set_test_state(self, state: Literal["stopped", "running"]) -> None: + # set button state according to the test state (running/stopped) + if state == "running": + self.__start_stop_btn.configure(text="STOP") + else: + self.__start_stop_btn.configure(text="START") + + +class SetupTab(ttk.Frame): + def __init_layout(self) -> None: + self.columnconfigure(0, weight=1) + self.columnconfigure(1, weight=4) + self.rowconfigure([0, 1], weight=1) + self.rowconfigure(2, weight=100) + + def __init_widgets(self) -> None: + self.__flexPtp_options_panel = FlexPtpOptionsPanel(self) + self.__flexPtp_options_panel.grid(row=0, column=0, sticky=tk.NSEW, ipady=4, padx=4) + + self.__linuxptp_options_panel = LinuxPtpOptionsPanel(self) + self.__linuxptp_options_panel.grid(row=1, rowspan=1, column=0, sticky=tk.NSEW, ipady=4, padx=4) + + self.__test_case_panel = TestCasePanel(self) + self.__test_case_panel.grid(row=0, rowspan=2, column=1, sticky=tk.NSEW, ipady=4, padx=4) + + self.__test_mgr_monitor_panel = TestManagerMonitorPanel(self) + self.__test_mgr_monitor_panel.grid(row=2, column=0, columnspan=2, sticky=tk.NSEW, ipady=4, padx=4) + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + + self.__init_layout() + self.__init_widgets() + + self.__start_stop_tests_cb : Callable[..., Literal["stopped", "running"]] | None = None + self.__test_mgr_monitor_panel.register_start_stop_tests_cb(self.__start_stop_tests) + + def __start_stop_tests(self) -> str: + test_state: Literal["stopped", "running"] = "stopped" + if self.__start_stop_tests_cb is not None: # invoke callback + test_state = self.__start_stop_tests_cb() + + # set widget state by test state + self.set_test_state(test_state) + + # return with current test state + return test_state + + def register_start_stop_tests_cb(self, start_stop_tests_cb) -> None: + self.__start_stop_tests_cb = start_stop_tests_cb + + + def get_settings(self) -> dict: + settings = { + "flexPTP": self.__flexPtp_options_panel.get_settings(), + "linuxptp": self.__linuxptp_options_panel.get_settings() + } + return settings + + def set_option_widgets_state(self, state: Literal["normal", "disabled"]) -> None: + # collect frames to deal with + frames = [ self.__flexPtp_options_panel, self.__linuxptp_options_panel, self.__test_case_panel ] + + # create a recursive state configuration function that traverses widgets + def set_widget_state(parent: tk.Widget | tk.Toplevel) -> None: + for widget in parent.winfo_children(): + if len(widget.winfo_children()) == 0 and widget.widgetName != "frame": + widget.configure(state=state) # type: ignore + else: + set_widget_state(widget) + + # apply required state to all specified frames + for frame in frames: + set_widget_state(frame) + + def get_test_cases(self) -> dict: + return self.__test_case_panel.get_test_cases() + + def get_test_treeview(self) -> ttk.Treeview: + return self.__test_mgr_monitor_panel.get_test_treeview() + + def set_test_state(self, state: Literal["stopped", "running"]) -> None: + self.__test_mgr_monitor_panel.set_test_state(state) + + if state == "running": + self.set_option_widgets_state("disabled") + else: + self.set_option_widgets_state("normal") + + diff --git a/gui/TerminalDisplay.py b/gui/TerminalDisplay.py new file mode 100644 index 0000000..b4d72ca --- /dev/null +++ b/gui/TerminalDisplay.py @@ -0,0 +1,31 @@ +from _tkinter import Tcl_Obj +import tkinter as tk + +import gui.GuiCommon as gcm + +class TerminalDisplay(tk.Text): + # global terminal-style outlook + TERMINAL_STYLE = { + "wrap": tk.WORD, + "background": "#3D3D3D", + "borderwidth": 0, + "foreground": "white", + "blockcursor": True, + "insertbackground": "white" + } + + def __init__(self, master: tk.Widget) -> None: + super().__init__(master=master, **self.TERMINAL_STYLE, font=gcm.TERMINAL_FONT, state=tk.DISABLED) + + def insert(self, index: str | float | Tcl_Obj | tk.Widget, chars: str, *args: str | list[str] | tuple[str, ...]) -> None: + self.configure(state=tk.NORMAL) + ret = super().insert(index, chars, *args) + self.yview(tk.END) + self.configure(state=tk.DISABLED) + return ret + + def delete(self, index1: str | float | Tcl_Obj | tk.Widget, index2: str | float | Tcl_Obj | tk.Widget | None = None) -> None: + self.configure(state=tk.NORMAL) + ret = super().delete(index1, index2) + self.configure(state=tk.DISABLED) + return ret \ No newline at end of file