flexPTP-test/GUI.py
András Wiesner 23e5005914 - Parameter configuration added
- Plotting added
- Statistic calculation added
- Save measurement data as XLSX feature added
- Test controller functionality extracted from the GUI class
2026-05-27 20:06:53 +02:00

387 lines
12 KiB
Python

import queue
import select
import sys
import threading
import time
from tkinter import Tk, font, ttk
from typing import Literal
import numpy
import serial
import serial.tools.list_ports
import ttk_text as ttkt
from ttkthemes import ThemedTk
import tkinter as tk
import netifaces
from matplotlib.backend_bases import key_press_handler
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import Common
from DeviceInterface import DeviceInterface
from FlexPtpController import FlexPtpController
from LinuxPtpObserver import LinuxPtpObserver
from TestController import TestController
from gui.LoggingTab import LoggingTab
from gui.ResultsTab import ResultsEntry
from gui.SetupTab import SetupTab
class GUI:
def __start_stop_tests(self) -> str:
if not self.__tests_running:
if self.start_tests():
self.__tests_running = True
return "running"
else:
return "stopped"
else:
self.stop_tests()
self.__tests_running = False
return "stopped"
def __init_setup_tab(self) -> None:
"""
Initialize "setup" tab widgets.
"""
self.__setup_tab = SetupTab(self.__tabs)
self.__setup_tab.register_start_stop_tests_cb(self.__start_stop_tests)
self.__tabs.add(self.__setup_tab, text="Setup")
def __init_logtab(self) -> None:
"""
Initialize log tab.
"""
# initialize tab and its widgets
self.__logtab = LoggingTab(self.__tabs)
# initialize messaging queues
self.__flexPtp_log_queue = queue.Queue()
self.__linuxptp_log_queue = queue.Queue()
def __init_results_tab(self) -> None:
"""
Initialize "test results" tab.
"""
self.__results_tab = ttk.Frame(self.__tabs)
self.__tabs.add(self.__results_tab, text="Results")
def __init_tabs(self) -> None:
"""
Initialize tabs.
"""
self.__init_setup_tab()
self.__init_logtab()
self.__init_results_tab()
def __init_statusbar(self) -> None:
self.__status = tk.StringVar(value="Idle")
statusbar = ttk.Label(self.__win, textvariable=self.__status)
statusbar.pack(anchor=tk.S, side=tk.BOTTOM, fill=tk.X)
def set_status(self, status: str) -> None:
self.__status.set(status)
def __init_print_polls(self) -> None:
"""
Initialize polled log printing.
"""
def print_logs() -> None:
while not self.__flexPtp_log_queue.empty():
self.__logtab.add_log("flexPTP", self.__flexPtp_log_queue.get())
while not self.__linuxptp_log_queue.empty():
self.__logtab.add_log("linuxptp", self.__linuxptp_log_queue.get())
self.__win.after(50, print_logs)
self.__win.after(50, print_logs)
def __init__(self) -> None:
self.__win = ThemedTk(theme="arc")
self.__win.minsize(1000, 700)
#self.__win.configure()
self.__win.title("flexPTP test suite")
self.__win.iconphoto(False, tk.PhotoImage(file="media/flexPTP_test.png"))
self.__tabs = ttk.Notebook(self.__win)
self.__tabs.configure()
self.__init_tabs()
self.__tabs.pack(expand=True, anchor=tk.S, side=tk.TOP, fill=tk.BOTH)
self.__init_statusbar()
self.__init_print_polls()
self.__tests_running = False
self.__user_stop = False
def mainloop(self) -> None:
self.__win.mainloop()
def __gather_tests(self) -> list:
"""
Gather user-selected test cases.
"""
tests = []
for name, case in self.__setup_tab.get_test_cases().items():
ptype = case["type"]
template = {}
if ptype == "general":
id = case["delmech"] + "_" + case["layer"]
name = case["delmech"] + " " + case["layer"]
template = { "type": ptype, "name": name, "id": id, "delmech": case["delmech"], "layer": case["layer"] }
elif ptype == "defined":
template = { "type": ptype, "name": name, "id": name }
if case["master"].get():
master_mode = dict(template)
master_mode["mode"] = "master"
master_mode["name"] += " (master)"
tests.append(master_mode)
if case["slave"].get():
slave_mode = dict(template)
slave_mode["mode"] = "slave"
slave_mode["name"] += " (slave)"
tests.append(slave_mode)
return tests
def __populate_tests_treeview(self, tests: list) -> None:
"""
Fill the test treeview.
"""
tw = self.__setup_tab.get_test_treeview()
items = tw.get_children()
if items != ():
tw.delete(*items)
seqnum = 1
n = len(tests)
for test in tests:
test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-", "-"), tags=("pending"))
seqnum += 1
def __populate_results_tab(self, tests: list, params: dict) -> None:
"""
Fill the results panel
"""
# clear previous results
restab = self.__results_tab
items = restab.winfo_children()
for item in items:
item.destroy()
# create the new ones
for test in tests:
resentry = ResultsEntry(restab, test, params)
test["results_entry"] = resentry
resentry.pack(fill="x", expand=True)
def __open_flexPtp_interface(self) -> None:
"""
Open the flexPTP device interface.
"""
flexPtp_settings = self.__setup_tab.get_settings()["flexPTP"]
device = flexPtp_settings["device"]
opts = {
"baudrate": flexPtp_settings["baudrate"],
"parity": flexPtp_settings["parity"],
"stopbits": flexPtp_settings["stopbits"]
}
self.__device_interface = DeviceInterface(device, opts)
self.__flexPtp_controller = FlexPtpController(self.__device_interface)
def __init_linuxptp_observer(self) -> None:
"""
Initialize the linuxptp observer.
"""
linuxptp_settings = self.__setup_tab.get_settings()["linuxptp"]
self.__linuxptp_observer = LinuxPtpObserver(linuxptp_settings["interface"])
def __init_flexPtp(self) -> None:
"""
Initialize flexPTP.
"""
ctrl = self.__flexPtp_controller
ctrl.disable_all_logging()
ctrl.set_domain(0)
ctrl.reset_flexptp()
ctrl.enable_log("logid", True)
def test_routine(self) -> None:
pass
def __flexPtp_echo(self, data: str) -> None:
self.__flexPtp_log_queue.put(str(data))
def __linuxptp_echo(self, data: str) -> None:
self.__linuxptp_log_queue.put(str(data))
def __echo(self, target: Literal["flexPTP", "linuxptp", "both"], data: str) -> None:
if target in ("flexPTP", "both"):
self.__flexPtp_echo(data)
if target in ("linuxptp", "both"):
self.__linuxptp_echo(data)
def start_tests(self) -> bool:
"""
Start selected tests.
"""
tests = self.__gather_tests()
params = self.__setup_tab.get_parameters()
if len(tests) == 0:
self.set_status("No tests have been selected")
return False
self.__populate_tests_treeview(tests)
self.__populate_results_tab(tests, params)
self.set_status("Preparing flexPTP device")
self.__open_flexPtp_interface()
self.__init_flexPtp()
self.set_status("Starting linuxptp observer")
self.__init_linuxptp_observer()
# ----
self.__flexPtp_controller.register_log_callback(self.__flexPtp_echo)
self.__linuxptp_observer.register_observer_callback(self.__linuxptp_echo)
# ----
self.set_status("Starting tests")
# construct the test controller
self.__test_controller = TestController(self.__flexPtp_controller, self.__linuxptp_observer)
# event callback
def ecb(edata: dict) -> None:
test = edata.get("test", {})
# match the event by type
match edata["type"]:
case "TEST_COMMENCED": # the test is just about to start
# set status bar message
self.set_status("[{:d}/{:d}] Test in progress: {:s}".format(edata["tsi"] + 1, edata["ntests"], test["name"]))
# set treeview entry contents
str_start_time = Common.time_to_str(test["start"])
values = (test["name"], "In-progress", str_start_time, "-", "-")
self.__setup_tab.get_test_treeview().item(test["entry"], values=values, tags=("in_progress"))
# insert log lines
self.__echo("both", "\n\n----------\nSTARTING test: {:s} ({:s})\n start time: {:s}\n----------\n\n".format(test["id"], test["mode"], str_start_time))
# refresh entry
test["results_entry"].refresh()
case "TEST_CONCLUDED": # the test has just finished
# time to human-readable conversions
str_start_time = Common.time_to_str(test["start"])
str_end_time = Common.time_to_str(test["end"])
str_duration = time.strftime("%H:%M:%S", time.gmtime(test["duration"]))
# set treeview entry contents
self.__setup_tab.get_test_treeview().item(test["entry"], values=(
test["name"], test["result"].upper(),
str_start_time,
str_end_time,
str_duration,
test["comments"]
), tags=(test["result"]))
# insert log line
self.__echo("both", "\n\n----------\nCONCLUDING test: {:s} ({:s})\n result: {:s}\n start time: {:s}\n stop time: {:s}\n duration: {:s}\n----------\n\n".format(test["id"], test["mode"], test["result"].upper(), str_start_time, str_end_time, str_duration))
# refresh entry for the last time
test["results_entry"].refresh()
# finalize plots
test["results_entry"].finalize()
case "LINUXPTP_ERRORS": # print linuxptp errors
self.__echo("linuxptp", "ERROR: {:s}\n".format(edata["msg"]))
case "SYNC_CYCLE": # sync cycle notification
test["results_entry"].refresh()
case "TESTS_EXITED_EARLY": # testing process has been interrupted
pass
case "TESTS_FINISHED": # the whole testing process has stopped
# handle user/error induced stops
if not self.__user_stop:
self.stop_tests(user_stop=False)
else:
self.__user_stop = False
# print message accordingly
if not edata["early_exit"]:
self.set_status("Tests concluded")
else:
self.set_status("Tests stopped")
self.__test_controller.set_event_callback(ecb)
self.__test_controller.start_tests(tests, params)
# -------------------
self.__setup_tab.set_test_state("running")
return True
def stop_tests(self, user_stop: bool = True) -> bool:
"""
Stop tests in progress.
"""
self.__user_stop = user_stop
self.__linuxptp_observer.stop_linuxptp()
self.__device_interface.close()
self.__setup_tab.set_test_state("stopped")
self.__tests_running = False
return True