diff --git a/Common.py b/Common.py new file mode 100644 index 0000000..2aafd5b --- /dev/null +++ b/Common.py @@ -0,0 +1,4 @@ +import time + +def time_to_str(t: float) -> str: + return time.strftime("%d-%m-%Y %H:%M:%S", time.gmtime(t)) \ No newline at end of file diff --git a/GUI.py b/GUI.py index dbe95b0..003b062 100644 --- a/GUI.py +++ b/GUI.py @@ -4,6 +4,7 @@ import sys import threading import time from tkinter import Tk, font, ttk +from typing import Literal import numpy import serial import serial.tools.list_ports @@ -16,11 +17,15 @@ from matplotlib.backend_bases import key_press_handler from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure + +import Common from DeviceInterface import DeviceInterface from FlexPtpController import FlexPtpController from LinuxPtpObserver import LinuxPtpObserver +from TestController import TestController from gui.LoggingTab import LoggingTab +from gui.ResultsTab import ResultsEntry from gui.SetupTab import SetupTab class GUI: @@ -66,20 +71,8 @@ class GUI: Initialize "test results" tab. """ - self.__results_tabs = ttk.Frame(self.__tabs) - self.__tabs.add(self.__results_tabs, text="Results") - - fig = Figure(figsize=(5, 4), dpi=100) - t = numpy.arange(0, 3, .01) - ax = fig.add_subplot() - line, = ax.plot(t, 2 * numpy.sin(2 * numpy.pi * t)) - ax.set_xlabel("time [s]") - ax.set_ylabel("f(t)") - - canvas = FigureCanvasTkAgg(fig, master=self.__results_tabs) # A tk.DrawingArea. - canvas.draw() - - canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) + self.__results_tab = ttk.Frame(self.__tabs) + self.__tabs.add(self.__results_tab, text="Results") def __init_tabs(self) -> None: """ @@ -141,12 +134,12 @@ class GUI: self.__win.mainloop() - def __gather_tests(self) -> None: + def __gather_tests(self) -> list: """ Gather user-selected test cases. """ - self.__tests = [] + tests = [] for name, case in self.__setup_tab.get_test_cases().items(): ptype = case["type"] @@ -162,16 +155,18 @@ class GUI: master_mode = dict(template) master_mode["mode"] = "master" master_mode["name"] += " (master)" - self.__tests.append(master_mode) + tests.append(master_mode) if case["slave"].get(): slave_mode = dict(template) slave_mode["mode"] = "slave" slave_mode["name"] += " (slave)" - self.__tests.append(slave_mode) + tests.append(slave_mode) + + return tests - def __populate_tests_treeview(self) -> None: + def __populate_tests_treeview(self, tests: list) -> None: """ Fill the test treeview. """ @@ -182,11 +177,28 @@ class GUI: tw.delete(*items) seqnum = 1 - n = len(self.__tests) - for test in self.__tests: - test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-"), tags=("pending")) + n = len(tests) + for test in tests: + test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-", "-"), tags=("pending")) seqnum += 1 + def __populate_results_tab(self, tests: list, params: dict) -> None: + """ + Fill the results panel + """ + + # clear previous results + restab = self.__results_tab + items = restab.winfo_children() + for item in items: + item.destroy() + + # create the new ones + for test in tests: + resentry = ResultsEntry(restab, test, params) + test["results_entry"] = resentry + + resentry.pack(fill="x", expand=True) def __open_flexPtp_interface(self) -> None: """ @@ -228,196 +240,36 @@ class GUI: ctrl.enable_log("logid", True) def test_routine(self) -> None: - LINUXPTP_INIT_TIMEOUT = 5 - BMCA_TIMEOUT = 150 - SYNC_TIMEOUT = 4 - SYNC_MAX_CYCLES = 100 - TARGET_RANGE = 50**2 - WINDOW_SIZE = 10 - - def flexPtp_echo(data: str) -> None: - self.__flexPtp_log_queue.put(str(data)) - - def linuxptp_echo(data: str) -> None: - self.__linuxptp_log_queue.put(str(data)) - - def both_echo(data: str) -> None: - flexPtp_echo(data) - linuxptp_echo(data) - - self.__flexPtp_controller.register_log_callback(flexPtp_echo) - self.__linuxptp_observer.register_observer_callback(linuxptp_echo) - - def time_to_str(t: float) -> str: - return time.strftime("%d-%m-%Y %H:%M:%S", time.gmtime(t)) - - ctrl = self.__flexPtp_controller - observer = self.__linuxptp_observer - - tests_errored = False - - tsn = 1 - for test in self.__tests: - # identify correct mode or profile - id = test["id"] - mode = test["mode"] - - # set status - self.set_status("[{:d}/{:d}] Test in progress: {:s}".format(tsn, len(self.__tests), test["name"])) - - # setup info variables for the test - test["start"] = time.time() - str_start_time = time_to_str(test["start"]) - values = (test["name"], "In-progress", str_start_time, "-", "-") - self.__setup_tab.get_test_treeview().item(test["entry"], values=values, tags=("in_progress")) - - # announce the start of a new testcase - both_echo("\n\n----------\nSTARTING test: {:s} ({:s})\n start time: {:s}\n----------\n\n".format(id, mode, str_start_time)) - - # set mode or profile on the flexPTP device - ctrl.start_by_id(id) - - # adjust device priority to force required working mode - priority1 = 128 - if mode == "master": - priority1 = 100 - - ctrl.set_priority(priority1, 255) - - # start linuxptp with the matching mode or profile - observer.start_linuxptp(id) - - n = 0 - var = 1E+09 - dts = [] - try: - # wait for the LISTENING state clearly indicating the startup of the linuxptp - event = observer.wait_for_event("bmca-statechange", LINUXPTP_INIT_TIMEOUT, { "to": "LISTENING" }) - if event.get_name() != "bmca-statechange": - raise Exception("linuxptp init failed") - - # turn on BMCA and default logging on the flexPTP device - ctrl.enable_log("bmca", True) - ctrl.enable_log("def", True) - - # assign flexPTP and linuxptp sides - if mode == "slave": - master_side = observer - slave_side = ctrl - observer_bmca_role = "MASTER" - ctrl_bmca_role = "SLAVE" - else: - master_side = ctrl - slave_side = observer - observer_bmca_role = "SLAVE" - ctrl_bmca_role = "MASTER" + pass - # wait for the correct BMCA states to settle in - error = False - error_cause = "none" + def __flexPtp_echo(self, data: str) -> None: + self.__flexPtp_log_queue.put(str(data)) - event = ctrl.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": ctrl_bmca_role }) # flexPTP - if event.get_name() != "bmca-statechange": - raise Exception(event.get_name()) - - event = observer.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": observer_bmca_role }) # linuxptp - if event.get_name() != "bmca-statechange": - raise Exception(event.get_name()) + def __linuxptp_echo(self, data: str) -> None: + self.__linuxptp_log_queue.put(str(data)) - # wait for the synchronization to settle - while n < SYNC_MAX_CYCLES and var > TARGET_RANGE: - event = slave_side.get_event(SYNC_TIMEOUT) - - # a state change with the BMCA indicates a clear error - if event.get_name() == "synclog-slave": - dts.append(event.get_args()["Dt"]) - elif event.get_name() in ("bmca-statechange", "exit"): - raise Exception(event.get_name()) - - if n >= WINDOW_SIZE: - var = numpy.var(dts[n-10:n]) - - n += 1 - - # check variance - error = not var < TARGET_RANGE - - except Exception as e: - error = True - error_cause = e.args[0] - - # display linuxptp errors - linuxptp_errors = observer.get_errors() - if linuxptp_errors != "": - linuxptp_errors = "ERROR:\n" + linuxptp_errors - linuxptp_echo(linuxptp_errors) - - # stop linuxptp - observer.stop_linuxptp() - - # exit if some of the subordinate layers signaled an exit - if error and error_cause in ("exit", "linuxptp init failed"): - self.set_status("Tests stopped") - tests_errored = True - - # save results, conclusion time and calculate duration - test["dts"] = dts - test["cycles"] = n - - passed = not error - if passed: - test["result"] = "passed" - else: - if tests_errored: - test["result"] = "errored" - else: - test["result"] = "failed" - - # save timestamps - test["end"] = time.time() - test["duration"] = test["end"] - test["start"] - - str_end_time = time_to_str(test["end"]) - str_duration = time.strftime("%H:%M:%S", time.gmtime(test["duration"])) - - self.__setup_tab.get_test_treeview().item(test["entry"], values=( - test["name"], test["result"].upper(), - str_start_time, - str_end_time, - str_duration - ), tags=(test["result"])) - - # announce the end of the test - both_echo("\n\n----------\nCONCLUDING test: {:s} ({:s})\n result: {:s}\n start time: {:s}\n stop time: {:s}\n duration: {:s}\n----------\n\n".format( - id, mode, test["result"].upper(), str_start_time, str_end_time, str_duration)) - - if tests_errored: - break - - tsn += 1 - - if not self.__user_stop: - self.stop_tests() - - if not tests_errored: - self.set_status("Tests concluded") - - self.__user_stop = False + def __echo(self, target: Literal["flexPTP", "linuxptp", "both"], data: str) -> None: + if target in ("flexPTP", "both"): + self.__flexPtp_echo(data) + if target in ("linuxptp", "both"): + self.__linuxptp_echo(data) def start_tests(self) -> bool: """ Start selected tests. """ - self.__gather_tests() + tests = self.__gather_tests() + params = self.__setup_tab.get_parameters() - if len(self.__tests) == 0: + if len(tests) == 0: self.set_status("No tests have been selected") return False - self.__populate_tests_treeview() + self.__populate_tests_treeview(tests) + self.__populate_results_tab(tests, params) self.set_status("Preparing flexPTP device") @@ -428,28 +280,108 @@ class GUI: self.__init_linuxptp_observer() + # ---- + + self.__flexPtp_controller.register_log_callback(self.__flexPtp_echo) + self.__linuxptp_observer.register_observer_callback(self.__linuxptp_echo) + # ---- self.set_status("Starting tests") - self.__test_thread = threading.Thread(target=self.test_routine) - self.__test_thread.start() + # construct the test controller + self.__test_controller = TestController(self.__flexPtp_controller, self.__linuxptp_observer) + + # event callback + def ecb(edata: dict) -> None: + test = edata.get("test", {}) + + # match the event by type + match edata["type"]: + case "TEST_COMMENCED": # the test is just about to start + # set status bar message + self.set_status("[{:d}/{:d}] Test in progress: {:s}".format(edata["tsi"] + 1, edata["ntests"], test["name"])) + + # set treeview entry contents + str_start_time = Common.time_to_str(test["start"]) + values = (test["name"], "In-progress", str_start_time, "-", "-") + self.__setup_tab.get_test_treeview().item(test["entry"], values=values, tags=("in_progress")) + + # insert log lines + self.__echo("both", "\n\n----------\nSTARTING test: {:s} ({:s})\n start time: {:s}\n----------\n\n".format(test["id"], test["mode"], str_start_time)) + + # refresh entry + test["results_entry"].refresh() + + case "TEST_CONCLUDED": # the test has just finished + # time to human-readable conversions + str_start_time = Common.time_to_str(test["start"]) + str_end_time = Common.time_to_str(test["end"]) + str_duration = time.strftime("%H:%M:%S", time.gmtime(test["duration"])) + + # set treeview entry contents + self.__setup_tab.get_test_treeview().item(test["entry"], values=( + test["name"], test["result"].upper(), + str_start_time, + str_end_time, + str_duration, + test["comments"] + ), tags=(test["result"])) + + # insert log line + self.__echo("both", "\n\n----------\nCONCLUDING test: {:s} ({:s})\n result: {:s}\n start time: {:s}\n stop time: {:s}\n duration: {:s}\n----------\n\n".format(test["id"], test["mode"], test["result"].upper(), str_start_time, str_end_time, str_duration)) + + # refresh entry for the last time + test["results_entry"].refresh() + + # finalize plots + test["results_entry"].finalize() + + + case "LINUXPTP_ERRORS": # print linuxptp errors + self.__echo("linuxptp", "ERROR: {:s}\n".format(edata["msg"])) + + case "SYNC_CYCLE": # sync cycle notification + test["results_entry"].refresh() + + case "TESTS_EXITED_EARLY": # testing process has been interrupted + pass + + case "TESTS_FINISHED": # the whole testing process has stopped + # handle user/error induced stops + if not self.__user_stop: + self.stop_tests(user_stop=False) + else: + self.__user_stop = False + + # print message accordingly + if not edata["early_exit"]: + self.set_status("Tests concluded") + else: + self.set_status("Tests stopped") + + self.__test_controller.set_event_callback(ecb) + + self.__test_controller.start_tests(tests, params) + + # ------------------- self.__setup_tab.set_test_state("running") return True - def stop_tests(self) -> bool: + def stop_tests(self, user_stop: bool = True) -> bool: """ Stop tests in progress. """ - self.__user_stop = True + self.__user_stop = user_stop self.__linuxptp_observer.stop_linuxptp() self.__device_interface.close() self.__setup_tab.set_test_state("stopped") + self.__tests_running = False return True \ No newline at end of file diff --git a/TestController.py b/TestController.py index 3a301c7..379260d 100644 --- a/TestController.py +++ b/TestController.py @@ -1,40 +1,257 @@ -from DeviceInterface import DeviceInterface +import threading +import time +from typing import Callable, Literal + +import numpy + from LinuxPtpObserver import LinuxPtpObserver from FlexPtpController import FlexPtpController + class TestController: - def __prepare_device(self) -> None: - self.__fptp_controller.disable_all_logging() - self.__fptp_controller.set_priority(128, 255) - self.__fptp_controller.set_domain(0) - self.__fptp_controller.reset_flexptp() + def __test_routine(self) -> None: + # controller and observer objects + ctrl = self.__flexPtp_controller + observer = self.__linuxptp_observer - def __init__(self, di: DeviceInterface, lptp_observer: LinuxPtpObserver) -> None: - self.__di = di - self.__fptp_controller = FlexPtpController(di) - self.__lptp_observer = lptp_observer - pass - - def start_test_e2e_udp(self) -> None: - self.__fptp_controller.start_e2e_udp() - self.__lptp_observer.start_linuxptp("E2E_UDP") + test_exited_early = False # a test has encountered an unrecoverable error - def start_test_e2e_l2(self) -> None: - self.__fptp_controller.start_e2e_l2() - self.__lptp_observer.start_linuxptp("E2E_L2") + ntests = len(self.__tests) # number of tests + tsi = 0 # test index - def start_test_p2p_udp(self) -> None: - self.__fptp_controller.start_p2p_udp() - self.__lptp_observer.start_linuxptp("P2P_UDP") + # iterate over all test cases + for test in self.__tests: + # store start time + test["start"] = time.time() - def start_test_p2p_l2(self) -> None: - self.__fptp_controller.start_p2p_l2() - self.__lptp_observer.start_linuxptp("P2P_L2") + # set result to "in-progress" + test["result"] = "in-progress" - def start_test_gPTP(self) -> None: - self.__fptp_controller.start_gPTP() - self.__lptp_observer.start_linuxptp("gPTP") + # test data arrays + dts = [] + vars = [] + means = [] - def stop_test(self) -> None: - self.__lptp_observer.stop_linuxptp() + # assign to test data + test["data"] = { + "skip": self.__params["averaging_window_size"], + "dt": dts, + "var": vars, + "mean": means, + "min": { + "global": 1E+09, + "local": 1E+09 + }, + "max": { + "global": -1E+09, + "local": -1E+09 + }, + "cycles": 0, + "rawtype": "", + "raw": [] + } + # send the TEST_COMMENCED event announcing the processing of a new testcase + self.__dispatch_event({ "type": "TEST_COMMENCED", "tsi": tsi, "ntests": ntests, "test": test }) + + # identify correct mode or profile + id = test["id"] + mode = test["mode"] + + # set mode or profile on the flexPTP device + ctrl.start_by_id(id) + + # adjust device priority to force required working mode + priority1 = 128 + if mode == "master": + priority1 = 100 + + ctrl.set_priority(priority1, 255) + + # start linuxptp with the matching mode or profile + observer.start_linuxptp(id) + + n = 0 # sync cycle sequence number + var = 1E+09 # variance + mean = 1E-09 # mean + + # error handling + error_indication = False + error_cause = "none" + + # the testing process + try: + # (1) wait for the LISTENING state clearly indicating the startup of the linuxptp + event = observer.wait_for_event("bmca-statechange", self.__params["linuxptp_init_timeout"], { "to": "LISTENING" }) + if event.get_name() != "bmca-statechange": + if event.get_name() == "exit": + cause = "exit" + else: + cause = "linuxptp init failed" + + raise Exception(cause) + + # (2) turn on BMCA and default logging on the flexPTP device + ctrl.enable_log("bmca", True) + ctrl.enable_log("def", True) + + # (3) assign flexPTP and linuxptp sides + if mode == "slave": + master_side = observer + slave_side = ctrl + observer_bmca_role = "MASTER" + ctrl_bmca_role = "SLAVE" + rawtype = "flexPTP" + else: + master_side = ctrl + slave_side = observer + observer_bmca_role = "SLAVE" + ctrl_bmca_role = "MASTER" + rawtype = "linuxptp" + + # set raw data type + test["data"]["rawtype"] = rawtype + + + # (4) wait for the correct BMCA states to settle in + event = ctrl.wait_for_event("bmca-statechange", self.__params["bmca_timeout"], { "to": ctrl_bmca_role }) # flexPTP + if event.get_name() != "bmca-statechange": + if event.get_name() == "exit": + cause = "exit" + else: + cause = "flexPTP BMCA lock-in failed" + + raise Exception(cause) + + + event = observer.wait_for_event("bmca-statechange", self.__params["bmca_timeout"], { "to": observer_bmca_role }) # linuxptp + if event.get_name() != "bmca-statechange": + if event.get_name() == "exit": + cause = "exit" + else: + cause = "linuxptp BMCA lock-in failed" + + raise Exception(cause) + + # (5) wait for the synchronization to settle + while n < self.__params["sync_min_cycles"] or (n < self.__params["sync_max_cycles"] and var > self.__params["target_variance"]): + event = slave_side.get_event(self.__params["sync_timeout"]) + + # a state change with the BMCA indicates a clear error + match event.get_name(): + case "synclog-slave": + eargs = event.get_args() + dt = dts.append(eargs["Dt"]) + + test["data"]["raw"].append(list(eargs.values())) + + case "bmca-statechage": + raise Exception("BMCA has switched state mid-test") + case "exit": + raise Exception("exit") + + # recalculate variance + if n >= self.__params["averaging_window_size"]: + window = dts[n-self.__params["averaging_window_size"]:n] + var = numpy.var(window) + mean = numpy.mean(window) + test["data"]["min"]["local"] = numpy.min(window) + test["data"]["max"]["local"] = numpy.max(window) + + # search min/max + if dts[-1] < test["data"]["min"]["global"]: + test["data"]["min"]["global"] = dts[-1] + + if dts[-1] > test["data"]["max"]["global"]: + test["data"]["max"]["global"] = dts[-1] + + # always store variance and mean, even if not meaningful + vars.append(var) + means.append(mean) + + # increment index + n += 1 + + # store cycle count + test["data"]["cycles"] = n + + # dispatch SYNC_CYCLE event + self.__dispatch_event({"type": "SYNC_CYCLE", "test": test }) + + # check variance + if var > self.__params["target_variance"]: + error_indication = True + error_cause = "too high variance" + + # handle exceptions + except Exception as e: + error_indication = True + error_cause = e.args[0] + + # display linuxptp errors + linuxptp_errors = observer.get_errors() + if linuxptp_errors != "": + self.__dispatch_event({"type": "LINUXPTP_ERRORS", "msg": linuxptp_errors, "test": test}) + + # stop linuxptp + observer.stop_linuxptp() + + # exit if some of the subordinate layers signaled an exit + if error_indication and error_cause in ("exit", "linuxptp init failed"): + self.__dispatch_event({"type": "TESTS_EXITED_EARLY", "cause": error_cause, "test": test}) + test_exited_early = True + + # save results, conclusion time and calculate duration + test["dts"] = dts + test["cycles"] = n + test["comments"] = "" + + # check pass/fail conditions + passed = not error_indication + if passed: + test["result"] = "passed" + else: + test["comments"] = error_cause + if test_exited_early: + test["result"] = "errored" + else: + test["result"] = "failed" + + # save timestamps + test["end"] = time.time() + test["duration"] = test["end"] - test["start"] + + # emit the TEST_CONCLUDED event + self.__dispatch_event({"type": "TEST_CONCLUDED", "test": test}) + + # if the tests has errored, then + if test_exited_early: + break + + # increase test index + tsi += 1 + + # send CLEANUP_TESTS event + self.__dispatch_event({ "type": "TESTS_FINISHED", "early_exit": test_exited_early }) + + + def __init__( + self, flexPtp_controller: FlexPtpController, lptp_observer: LinuxPtpObserver + ) -> None: + self.__flexPtp_controller = flexPtp_controller + self.__linuxptp_observer = lptp_observer + self.__event_callback: Callable[[dict], None] | None = None + + def set_event_callback(self, cb: Callable[[dict], None]) -> None: + self.__event_callback = cb + + def __dispatch_event(self, data: dict) -> None: + if self.__event_callback is not None: + self.__event_callback(data) + + def start_tests(self, tests: list, parameters: dict) -> None: + self.__tests = tests + self.__params = parameters + + self.__test_thread = threading.Thread(target=self.__test_routine) + self.__test_thread.start() \ No newline at end of file diff --git a/gui/ResultsTab.py b/gui/ResultsTab.py new file mode 100644 index 0000000..35b07e2 --- /dev/null +++ b/gui/ResultsTab.py @@ -0,0 +1,378 @@ +from collections.abc import Callable +import time +import tkinter as tk +from tkinter import ttk +from typing import Callable, Literal + +from matplotlib.axes import Axes +from matplotlib.figure import Figure +import matplotlib.pyplot as plt +import numpy +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from matplotlib.ticker import MaxNLocator +import tkinter.filedialog + +import Common +import gui.GuiCommon as gcm + +import xlsxwriter + +# initialize plot style +#plt.rcParams["font.family"] = "" +plt.rcParams["font.size"] = 8 +plt.rcParams["lines.antialiased"] = True +plt.rcParams["lines.linewidth"] = 0.75 +plt.rcParams["grid.color"] = "#262626" +plt.rcParams["grid.alpha"] = 0.15 +plt.rcParams["grid.linewidth"] = 0.5 +plt.rcParams["legend.loc"] = "upper right" + +class ResultsInfoPanel(ttk.LabelFrame): + def __init_style(self) -> None: + style = ttk.Style() + style.configure("MeasInfo.TLabel", font=gcm.TERMINAL_FONT) + style.configure("MeasTitle.TLabel", font=("Ubuntu", 10, "bold")) + + style.configure("MeasStatPENDING.TFrame", background="Gray") + style.configure("MeasStatIN-PROGRESS.TFrame", background="Gold") + style.configure("MeasStatFAILED.TFrame", background="Salmon") + style.configure("MeasStatERRORED.TFrame", background="firebrick4") + style.configure("MeasStatPASSED.TFrame", background="LawnGreen") + style.configure("MeasStatSKIPPED.TFrame", background="Gray") + + style.configure("MeasStatPENDING.TLabel", background="Gray") + style.configure("MeasStatIN-PROGRESS.TLabel", background="Gold") + style.configure("MeasStatFAILED.TLabel", background="Salmon") + style.configure("MeasStatERRORED.TLabel", background="firebrick4") + style.configure("MeasStatPASSED.TLabel", background="LawnGreen") + style.configure("MeasStatSKIPPED.TLabel", background="Gray") + + def __init_layout(self) -> None: + self.columnconfigure([0, 1], weight=1) + + def __init_widgets(self) -> None: + # Start time + start_label = ttk.Label(self, text="Start:") + start_label.grid(row=0, column=0) + + # End time + end_label = ttk.Label(self, text="End:") + end_label.grid(row=1, column=0) + + # Duration + duration_label = ttk.Label(self, text="Duration:") + duration_label.grid(row=2, column=0) + + # separator + separator1 = ttk.Separator(self) + separator1.grid(row=3, column=0, columnspan=2, sticky=tk.EW) + + # Cycles label + cycles_label = ttk.Label(self, text="Cycles:") + cycles_label.grid(row=4, column=0) + + # Mean label + mean_label = ttk.Label(self, text="Mean:") + mean_label.grid(row=5, column=0) + + # Variance label + variance_label = ttk.Label(self, text="Variance:") + variance_label.grid(row=6, column=0) + + # Global maximum label + gmax_label = ttk.Label(self, text="Max. [global]:") + gmax_label.grid(row=7, column=0) + + # Global minimum label + gmin_label = ttk.Label(self, text="Min. [global]:") + gmin_label.grid(row=8, column=0) + + # Local maximum label + lmax_label = ttk.Label(self, text="Max. [local]:") + lmax_label.grid(row=9, column=0) + + # Local minimum label + lmin_label = ttk.Label(self, text="Min. [local]:") + lmin_label.grid(row=10, column=0) + + # separator + separator2 = ttk.Separator(self) + separator2.grid(row=11, column=0, columnspan=2, sticky=tk.EW) + + # ----------------- + + # Start time display + self.__start_display = ttk.Label(self, text="-") + self.__start_display.grid(row=0, column=1) + + # End time display + self.__end_display = ttk.Label(self, text="-") + self.__end_display.grid(row=1, column=1) + + # Duration display + self.__duration_display = ttk.Label(self, text="-") + self.__duration_display.grid(row=2, column=1) + + # Cycles display + self.__cycles_display = ttk.Label(self, text="-") + self.__cycles_display.grid(row=4, column=1) + + # Mean display + self.__mean_display = ttk.Label(self, text="-") + self.__mean_display.grid(row=5, column=1) + + # Variance display + self.__variance_display = ttk.Label(self, text="-") + self.__variance_display.grid(row=6, column=1) + + # Global maximum display + self.__gmax_display = ttk.Label(self, text="-") + self.__gmax_display.grid(row=7, column=1) + + # Global minimum display + self.__gmin_display = ttk.Label(self, text="-") + self.__gmin_display.grid(row=8, column=1) + + # Local maximum display + self.__lmax_display = ttk.Label(self, text="-") + self.__lmax_display.grid(row=9, column=1) + + # Local minimum display + self.__lmin_display = ttk.Label(self, text="-") + self.__lmin_display.grid(row=10, column=1) + + # ---------------- + + # apply styling + for widget in self.winfo_children(): + match widget: + case ttk.Label(): + widget.configure(style="MeasInfo.TLabel") + widget.grid(sticky=tk.E) + + # ----------------- + + self.__status_frame = ttk.Frame(self) + self.__status_frame.grid(row=12, column=0, columnspan=2, pady=8, sticky=tk.EW) + + self.__status_display = ttk.Label(self.__status_frame, text="-", font=("Ubuntu", 12, "bold")) + self.__status_display.pack() + + # ------------------ + + self.__save_data_btn = ttk.Button(self, command=self.save_results, text="Save", state="disabled") + self.__save_data_btn.grid(row=13, column=0, columnspan=2, pady=16, sticky=tk.EW + tk.S) + + def refresh(self) -> None: + result = self.__test.get("result", "pending").upper() + match result: + case "IN-PROGRESS" | "PASSED" | "FAILED": + cycles = self.__test["data"]["cycles"] + self.__cycles_display["text"] = "{:d}".format(self.__test["data"]["cycles"]) + if cycles > 0: + self.__gmax_display["text"] = "{:d} ns".format(self.__test["data"]["max"]["global"]) + self.__gmin_display["text"] = "{:d} ns".format(self.__test["data"]["min"]["global"]) + if cycles > self.__test["data"]["skip"]: + self.__variance_display["text"] = "{:.0f} ns^2".format(self.__test["data"]["var"][-1]) + self.__mean_display["text"] = "{:.0f} ns".format(self.__test["data"]["mean"][-1]) + self.__lmax_display["text"] = "{:d} ns".format(self.__test["data"]["max"]["local"]) + self.__lmin_display["text"] = "{:d} ns".format(self.__test["data"]["min"]["local"]) + + match result: + case "PENDING": + pass + case "IN-PROGRESS": + self.__start_display["text"] = Common.time_to_str(self.__test.get("start", "0")) + case "PASSED" | "FAILED": + self.__start_display["text"] = Common.time_to_str(self.__test.get("start", "0")) + self.__end_display["text"] = Common.time_to_str(self.__test.get("end", "0")) + self.__duration_display["text"] = time.strftime("%H:%M:%S", time.gmtime(self.__test["duration"])) + + self.__status_display["text"] = result + self.__status_display.configure(style="MeasStat{:s}.TLabel".format(result)) + self.__status_frame.configure(style="MeasStat{:s}.TFrame".format(result)) + + def finalize(self) -> None: + self.__save_data_btn.configure(state="normal") + + def __init__(self, master: tk.Widget, test: dict) -> None: + super().__init__(master) + + self.__test = test + + self.__init_style() + self.__init_layout() + self.__init_widgets() + + self.configure(labelwidget=ttk.Label(self, text="Measurement info", style="MeasTitle.TLabel"), width=240) + + self.refresh() + + def save_results(self) -> None: + # pick a place to save to + fn = tkinter.filedialog.asksaveasfilename( + title="Save measurement data", + filetypes=[("XLSX", "*.xlsx")] + ) + + if fn == (): + return + + if not fn.endswith(".xlsx"): + fn += ".xlsx" + + # -------------- + + # create the workbook + workbook = xlsxwriter.Workbook(fn) + + # save the summary + summary_sheet = workbook.add_worksheet("summary") + summary = [ + [ "Start:", Common.time_to_str(self.__test["start"])], + [ "End:", Common.time_to_str(self.__test["end"]) ], + [ "Duration:", time.strftime("%H:%M:%S", time.gmtime(self.__test["duration"])) ], + [ "Cycles:", self.__test["data"]["cycles"] ], + [ "Mean [ns]:", self.__test["data"]["mean"][-1] ], + [ "Variance [ns^2]:", self.__test["data"]["var"][-1] ], + [ "Max. (global) [ns]:", self.__test["data"]["max"]["global"] ], + [ "Min. (global) [ns]:", self.__test["data"]["min"]["global"] ], + [ "Max. (local) [ns]:", self.__test["data"]["max"]["local"] ], + [ "Min. (local) [ns]:", self.__test["data"]["min"]["local"] ] + ] + rowi = 0 + for summary_line in summary: + summary_sheet.write_row(rowi, 0, summary_line) + rowi += 1 + + # create the raw dump sheet + dump_sheet = workbook.add_worksheet("raw_data") + + # write the header + if self.__test["data"]["rawtype"] == "flexPTP": + header = [ "T1", "T4", "Dt", "corr_ppb", "mpd_ns", "sync_period_ns" ] + else: + header = [ "Dt", "mpd_ns" ] + + dump_sheet.write_row(0, 0, header) + + # save actual sync cycle data + rowi = 1 + for record in self.__test["data"]["raw"]: + dump_sheet.write_row(rowi, 0, record) + rowi += 1 + + workbook.close() + + +class PlotPanel(ttk.Frame): + def __init_widgets(self, figsize: tuple) -> None: + self.__fig, self.__ax = plt.subplots(figsize=figsize) + self.__full_plot, = self.__ax.plot([], []) + + self.__ax.set_ylabel("Time error [ns]") + self.__ax.set_xlabel("Cycles") + self.__ax.grid(True) + self.__ax.xaxis.set_major_locator(MaxNLocator(integer=True)) + self.__ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + + self.__full_plot_canvas = FigureCanvasTkAgg(self.__fig, master=self) # A tk.DrawingArea. + + style = ttk.Style() + bg = style.lookup("TFrame", "background") + self.__full_plot_canvas.get_tk_widget().config(bg=bg) + + self.__full_plot_canvas.get_tk_widget().pack(expand=True, fill="both") + + self.__fig.tight_layout() + self.__fig.patch.set_alpha(0) + + def plot(self, start: int, stop: int, data: list[int]) -> None: + self.__full_plot.set_xdata(range(start, stop)) + self.__full_plot.set_ydata(data[start:stop]) + self.__ax.relim() + self.__ax.autoscale_view() + + self.draw() + + def draw(self) -> None: + self.__full_plot_canvas.draw() + self.__full_plot_canvas.flush_events() + + def get_ax(self) -> Axes: + return self.__ax + + def get_fig(self) -> Figure: + return self.__fig + + def __init__(self, master: tk.Widget, figsize: tuple = (8, 2)) -> None: + super().__init__(master) + + self.__init_widgets(figsize) + + + +class ResultsEntry(ttk.LabelFrame): + MIN_MAINPLOT_XRANGE = 100 + MIN_LOOKIN_WINDOW_LENGTH = 5 + + def __init_layout(self) -> None: + self.columnconfigure(0, weight=6) + self.columnconfigure([1, 2], weight=2) + + def __init_widgets(self) -> None: + # full measurement figure + self.__main_figure = PlotPanel(self, figsize=(8, 3)) + self.__main_figure.get_ax().set_xlim(0, self.MIN_MAINPLOT_XRANGE) + self.__main_figure.grid(row=0, column=0, sticky=tk.NSEW) + + # look-in window + self.__lookin_figure = PlotPanel(self, figsize=(2, 3)) + self.__lookin_figure.get_ax().set_xlim(0, self.__lookin_window_size - 1) + self.__lookin_figure.get_ax().set_ylabel("") + self.__lookin_figure.get_ax().yaxis.tick_right() + self.__lookin_figure.get_fig().subplots_adjust(left=0.05, right=0.78) + self.__lookin_figure.grid(row=0, column=1, sticky=tk.NSEW) + + # info panel + self.__info_panel = ResultsInfoPanel(self, self.__test) + self.__info_panel.grid(row=0, column=2, sticky=tk.NSEW, ipadx=4, padx=4, pady=4) + self.__info_panel.grid_propagate(False) + + def refresh(self) -> None: + self.__info_panel.refresh() + + data = self.__test["data"] + n = data["cycles"] + + self.__main_figure.get_ax().set_xlim(0, numpy.max([self.MIN_MAINPLOT_XRANGE, n, self.__params["sync_min_cycles"]])) + self.__main_figure.plot(0, n, data["dt"]) + + if n > self.__lookin_window_size: + n0 = n - self.__lookin_window_size + self.__lookin_figure.get_ax().set_xlim(n0, n - 1) + self.__lookin_figure.plot(n0, n, data["dt"]) + + def finalize(self) -> None: + self.__main_figure.get_ax().set_xlim(0, self.__test["data"]["cycles"] - 1) + self.__main_figure.draw() + + self.__info_panel.finalize() + + def __init__(self, master: tk.Widget, test: dict, params: dict) -> None: + super().__init__(master) + self.configure(labelwidget=ttk.Label(self, text=test["name"], font=gcm.TITLE_FONT)) + + self.__test = test + self.__params = params + + self.__lookin_window_size = numpy.max([self.__params["averaging_window_size"], self.MIN_LOOKIN_WINDOW_LENGTH]) + + self.__init_layout() + self.__init_widgets() + + +class ResultsTab(ttk.Frame): + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + diff --git a/gui/SetupTab.py b/gui/SetupTab.py index 0fe4e35..ba05bc5 100644 --- a/gui/SetupTab.py +++ b/gui/SetupTab.py @@ -1,6 +1,6 @@ from collections.abc import Callable import tkinter as tk -from tkinter import ttk +from tkinter import IntVar, StringVar, ttk from typing import Callable, Literal import serial.tools.list_ports @@ -216,6 +216,7 @@ class BasicTestCasesPanel(ttk.LabelFrame): "type": "general", "delmech": name[0:3].upper(), "layer": name[4:6].upper(), + "logSyncInterval": 0, "master": m, "slave": s } @@ -233,7 +234,11 @@ class BasicTestCasesPanel(ttk.LabelFrame): class DefinedTestCasesPanel(ttk.LabelFrame): - DEFINED_PROFILES = [ "gPTP" ] + DEFINED_PROFILES = { + "gPTP": { + "logSyncInterval": -3 + } + } PROFILE_FONT = ("Ubuntu", 12, "italic") def __init_styles(self) -> None: @@ -243,9 +248,12 @@ class DefinedTestCasesPanel(ttk.LabelFrame): def __init_widgets(self) -> None: self.__test_cases = {} ri = 0 - for dprof in self.DEFINED_PROFILES: + for dprof_key in self.DEFINED_PROFILES.keys(): + # fetch profile + dprof = self.DEFINED_PROFILES[dprof_key] + # test case label - label = ttk.Label(self, text=dprof + ":", font=self.PROFILE_FONT) + label = ttk.Label(self, text=dprof_key + ":", font=self.PROFILE_FONT) label.grid(row=ri, column=0, padx=6) # "Master" checkbox @@ -259,8 +267,9 @@ class DefinedTestCasesPanel(ttk.LabelFrame): s_chk.grid(row=ri, column=2) # store test case - self.__test_cases[dprof] = { + self.__test_cases[dprof_key] = { "type": "defined", + "logSyncInterval": dprof["logSyncInterval"], "master": m, "slave": s } @@ -298,6 +307,153 @@ class TestCasePanel(ttk.LabelFrame): return dict(self.__basic_test_panel.get_test_cases()) | dict(self.__defined_test_panel.get_test_cases()) +class TestParametersPanel(ttk.LabelFrame): + def __init_styles(self) -> None: + style = ttk.Style() + style.configure("Parameters.TLabel", font=gcm.TERMINAL_FONT) + + def __init_layout(self) -> None: + self.columnconfigure([0, 1], weight=1) + + def __init_widgets(self) -> None: + # linuxptp initialization timeout + linuxptp_init_timeout_label = ttk.Label(self, text="linuxptp init timeout") + linuxptp_init_timeout_label.grid(row=0, column=0) + + # BMCA settle timeout + bmca_timeout_label = ttk.Label(self, text="BMCA timeout") + bmca_timeout_label.grid(row=1, column=0) + + # sync timeout + sync_timeout_label = ttk.Label(self, text="Sync timeout") + sync_timeout_label.grid(row=2, column=0) + + # minimum number of sync cycles + min_sync_cycles_label = ttk.Label(self, text="Min. sync cycles") + min_sync_cycles_label.grid(row=3, column=0) + + # max number of sync cycles + max_sync_cycles_label = ttk.Label(self, text="Max. sync cycles") + max_sync_cycles_label.grid(row=4, column=0) + + # target variance + target_variance_label = ttk.Label(self, text="Target variance") + target_variance_label.grid(row=5, column=0) + + # averaging window size + averaging_window_size_label = ttk.Label(self, text="Averaging window size") + averaging_window_size_label.grid(row=6, column=0) + + # ---------- + + # linuxptp init timeout + self.__linuxptp_init_timeout_str = StringVar() + linuxptp_init_timeout_spinner = ttk.Spinbox(self, format="%.0f s", textvariable=self.__linuxptp_init_timeout_str) + linuxptp_init_timeout_spinner["from"] = 2 + linuxptp_init_timeout_spinner["to"] = 60 + linuxptp_init_timeout_spinner.set("5 s") + linuxptp_init_timeout_spinner.grid(row=0, column=1) + + # BMCA timeout + self.__bmca_timeout_str = StringVar() + bmca_timeout_spinner = ttk.Spinbox(self, format="%.0f s", textvariable=self.__bmca_timeout_str) + bmca_timeout_spinner["from"] = 2 + bmca_timeout_spinner["to"] = 300 + bmca_timeout_spinner.set("60 s") + bmca_timeout_spinner.grid(row=1, column=1) + + # Sync timeout + self.__sync_timeout_str = StringVar() + sync_timeout_spinner = ttk.Spinbox(self, format="%.0f cycles", textvariable=self.__sync_timeout_str) + sync_timeout_spinner["from"] = 2 + sync_timeout_spinner["to"] = 300 + sync_timeout_spinner.set("4 cycles") + sync_timeout_spinner.grid(row=2, column=1) + + def crosscheck_min_max_cycles(correct_this: Literal["min", "max"]) -> None: + min_cycles = int(self.__min_sync_cycles_str.get()[:-6]) + max_cycles = int(self.__max_sync_cycles_str.get()[:-6]) + + match correct_this: + case "min": + if max_cycles <= min_cycles: + min_cycles = max_cycles - 1 + case "max": + if min_cycles >= max_cycles: + max_cycles = min_cycles + 1 + + self.__min_sync_cycles_str.set("{:d} cycles".format(min_cycles)) + self.__max_sync_cycles_str.set("{:d} cycles".format(max_cycles)) + + def crosscheck_based_on_min() -> None: + crosscheck_min_max_cycles("max") + + def crosscheck_based_on_max() -> None: + crosscheck_min_max_cycles("min") + + # Min Sync cycles + self.__min_sync_cycles_str = StringVar() + max_sync_cycles_spinner = ttk.Spinbox(self, format="%.0f cycles", textvariable=self.__min_sync_cycles_str, command=crosscheck_based_on_min) + max_sync_cycles_spinner["from"] = 0 + max_sync_cycles_spinner["to"] = 10000000 - 1 + max_sync_cycles_spinner.set("0 cycles") + max_sync_cycles_spinner.grid(row=3, column=1) + + # Max Sync cycles + self.__max_sync_cycles_str = StringVar() + max_sync_cycles_spinner = ttk.Spinbox(self, format="%.0f cycles", textvariable=self.__max_sync_cycles_str, command=crosscheck_based_on_max) + max_sync_cycles_spinner["from"] = 2 + max_sync_cycles_spinner["to"] = 10000000 + max_sync_cycles_spinner.set("100 cycles") + max_sync_cycles_spinner.grid(row=4, column=1) + + # Target variance + self.__target_variance_str = StringVar() + target_variance_spinner = ttk.Spinbox(self, format="%.0f ns^2", textvariable=self.__target_variance_str) + target_variance_spinner["from"] = 10 + target_variance_spinner["to"] = 10**18 + target_variance_spinner["increment"] = 10 + target_variance_spinner.set("2500 ns^2") + target_variance_spinner.grid(row=5, column=1) + + # Averaging window size + self.__averaging_window_size_str = StringVar() + averaging_window_size_spinner = ttk.Spinbox(self, format="%.0f samples", textvariable=self.__averaging_window_size_str) + averaging_window_size_spinner["from"] = 1 + averaging_window_size_spinner["to"] = 10000 + averaging_window_size_spinner["increment"] = 1 + averaging_window_size_spinner.set("10 samples") + averaging_window_size_spinner.grid(row=6, column=1) + + # ---------- + + # apply styling + for widget in self.winfo_children(): + match widget: + case ttk.Label(): + widget.configure(style="Parameters.TLabel") + widget.grid(sticky=tk.E) + + def __init__(self, master: tk.Misc | None = None) -> None: + super().__init__(master) + + self.__init_styles() + self.__init_layout() + self.__init_widgets() + + self.configure(labelwidget=ttk.Label(self, text="Parameters", font=gcm.TITLE_FONT)) + + def get_parameters(self) -> dict: + return { + "linuxptp_init_timeout": int(self.__linuxptp_init_timeout_str.get()[:-1]), + "bmca_timeout": int(self.__bmca_timeout_str.get()[:-1]), + "sync_timeout": int(self.__sync_timeout_str.get()[:-6]), + "sync_min_cycles": int(self.__min_sync_cycles_str.get()[:-6]), + "sync_max_cycles": int(self.__max_sync_cycles_str.get()[:-6]), + "target_variance": int(self.__target_variance_str.get()[:-4]), + "averaging_window_size": int(self.__averaging_window_size_str.get()[:-7]), + } + class TestManagerMonitorPanel(ttk.LabelFrame): def __init_layout(self) -> None: pass @@ -306,13 +462,14 @@ class TestManagerMonitorPanel(ttk.LabelFrame): self.__start_stop_btn = ttk.Button(self, text="START", command=self.__start_stop_tests) self.__start_stop_btn.pack() - tw = ttk.Treeview(self, columns=("name", "result", "start", "end", "duration"), selectmode="none") + tw = ttk.Treeview(self, columns=("name", "result", "start", "end", "duration", "comments"), selectmode="none") tw.heading("#0", text="#") tw.heading("name", text="Name") tw.heading("result", text="Result") tw.heading("start", text="Start") tw.heading("end", text="End") tw.heading("duration", text="Duration") + tw.heading("comments", text="Comments") tw.tag_configure("passed", background="LawnGreen") tw.tag_configure("failed", background="Salmon") @@ -379,6 +536,9 @@ class SetupTab(ttk.Frame): self.__test_case_panel = TestCasePanel(self) self.__test_case_panel.grid(row=0, rowspan=2, column=1, sticky=tk.NSEW, ipady=4, padx=4) + self.__test_params_panel = TestParametersPanel(self) + self.__test_params_panel.grid(row=0, rowspan=3, column=2, sticky=tk.NSEW, ipadx=4, ipady=4) + self.__test_mgr_monitor_panel = TestManagerMonitorPanel(self) self.__test_mgr_monitor_panel.grid(row=2, column=0, columnspan=2, sticky=tk.NSEW, ipady=4, padx=4) @@ -413,9 +573,12 @@ class SetupTab(ttk.Frame): } return settings + def get_parameters(self) -> dict: + return self.__test_params_panel.get_parameters() + def set_option_widgets_state(self, state: Literal["normal", "disabled"]) -> None: # collect frames to deal with - frames = [ self.__flexPtp_options_panel, self.__linuxptp_options_panel, self.__test_case_panel ] + frames = [ self.__flexPtp_options_panel, self.__linuxptp_options_panel, self.__test_case_panel, self.__test_params_panel ] # create a recursive state configuration function that traverses widgets def set_widget_state(parent: tk.Widget | tk.Toplevel) -> None: diff --git a/test_main.py b/test_main.py index 8f2d80c..b9270f1 100644 --- a/test_main.py +++ b/test_main.py @@ -50,17 +50,4 @@ else: lptp_observer = LinuxPtpObserver(args[1]) lptp_observer.register_observer_callback(echo) -tc = TestController(di, lptp_observer) - -tc.start_test_gPTP() - -time.sleep(15) - -tc.stop_test() -tc.start_test_e2e_udp() - -time.sleep(15) - -tc.stop_test() - di.close() \ No newline at end of file