flexPTP-test/GUI.py
2026-05-07 10:42:36 +02:00

455 lines
14 KiB
Python

import queue
import select
import sys
import threading
import time
from tkinter import Tk, font, ttk
import numpy
import serial
import serial.tools.list_ports
import ttk_text as ttkt
from ttkthemes import ThemedTk
import tkinter as tk
import netifaces
from matplotlib.backend_bases import key_press_handler
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from DeviceInterface import DeviceInterface
from FlexPtpController import FlexPtpController
from LinuxPtpObserver import LinuxPtpObserver
from gui.LoggingTab import LoggingTab
from gui.SetupTab import SetupTab
class GUI:
def __start_stop_tests(self) -> str:
if not self.__tests_running:
if self.start_tests():
self.__tests_running = True
return "running"
else:
return "stopped"
else:
self.stop_tests()
self.__tests_running = False
return "stopped"
def __init_setup_tab(self) -> None:
"""
Initialize "setup" tab widgets.
"""
self.__setup_tab = SetupTab(self.__tabs)
self.__setup_tab.register_start_stop_tests_cb(self.__start_stop_tests)
self.__tabs.add(self.__setup_tab, text="Setup")
def __init_logtab(self) -> None:
"""
Initialize log tab.
"""
# initialize tab and its widgets
self.__logtab = LoggingTab(self.__tabs)
# initialize messaging queues
self.__flexPtp_log_queue = queue.Queue()
self.__linuxptp_log_queue = queue.Queue()
def __init_results_tab(self) -> None:
"""
Initialize "test results" tab.
"""
self.__results_tabs = ttk.Frame(self.__tabs)
self.__tabs.add(self.__results_tabs, text="Results")
fig = Figure(figsize=(5, 4), dpi=100)
t = numpy.arange(0, 3, .01)
ax = fig.add_subplot()
line, = ax.plot(t, 2 * numpy.sin(2 * numpy.pi * t))
ax.set_xlabel("time [s]")
ax.set_ylabel("f(t)")
canvas = FigureCanvasTkAgg(fig, master=self.__results_tabs) # A tk.DrawingArea.
canvas.draw()
canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True)
def __init_tabs(self) -> None:
"""
Initialize tabs.
"""
self.__init_setup_tab()
self.__init_logtab()
self.__init_results_tab()
def __init_statusbar(self) -> None:
self.__status = tk.StringVar(value="Idle")
statusbar = ttk.Label(self.__win, textvariable=self.__status)
statusbar.pack(anchor=tk.S, side=tk.BOTTOM, fill=tk.X)
def set_status(self, status: str) -> None:
self.__status.set(status)
def __init_print_polls(self) -> None:
"""
Initialize polled log printing.
"""
def print_logs() -> None:
while not self.__flexPtp_log_queue.empty():
self.__logtab.add_log("flexPTP", self.__flexPtp_log_queue.get())
while not self.__linuxptp_log_queue.empty():
self.__logtab.add_log("linuxptp", self.__linuxptp_log_queue.get())
self.__win.after(50, print_logs)
self.__win.after(50, print_logs)
def __init__(self) -> None:
self.__win = ThemedTk(theme="arc")
self.__win.minsize(1000, 700)
#self.__win.configure()
self.__win.title("flexPTP test suite")
self.__win.iconphoto(False, tk.PhotoImage(file="media/flexPTP_test.png"))
self.__tabs = ttk.Notebook(self.__win)
self.__tabs.configure()
self.__init_tabs()
self.__tabs.pack(expand=True, anchor=tk.S, side=tk.TOP, fill=tk.BOTH)
self.__init_statusbar()
self.__init_print_polls()
self.__tests_running = False
self.__user_stop = False
def mainloop(self) -> None:
self.__win.mainloop()
def __gather_tests(self) -> None:
"""
Gather user-selected test cases.
"""
self.__tests = []
for name, case in self.__setup_tab.get_test_cases().items():
ptype = case["type"]
template = {}
if ptype == "general":
id = case["delmech"] + "_" + case["layer"]
name = case["delmech"] + " " + case["layer"]
template = { "type": ptype, "name": name, "id": id, "delmech": case["delmech"], "layer": case["layer"] }
elif ptype == "defined":
template = { "type": ptype, "name": name, "id": name }
if case["master"].get():
master_mode = dict(template)
master_mode["mode"] = "master"
master_mode["name"] += " (master)"
self.__tests.append(master_mode)
if case["slave"].get():
slave_mode = dict(template)
slave_mode["mode"] = "slave"
slave_mode["name"] += " (slave)"
self.__tests.append(slave_mode)
def __populate_tests_treeview(self) -> None:
"""
Fill the test treeview.
"""
tw = self.__setup_tab.get_test_treeview()
items = tw.get_children()
if items != ():
tw.delete(*items)
seqnum = 1
n = len(self.__tests)
for test in self.__tests:
test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-"), tags=("pending"))
seqnum += 1
def __open_flexPtp_interface(self) -> None:
"""
Open the flexPTP device interface.
"""
flexPtp_settings = self.__setup_tab.get_settings()["flexPTP"]
device = flexPtp_settings["device"]
opts = {
"baudrate": flexPtp_settings["baudrate"],
"parity": flexPtp_settings["parity"],
"stopbits": flexPtp_settings["stopbits"]
}
self.__device_interface = DeviceInterface(device, opts)
self.__flexPtp_controller = FlexPtpController(self.__device_interface)
def __init_linuxptp_observer(self) -> None:
"""
Initialize the linuxptp observer.
"""
linuxptp_settings = self.__setup_tab.get_settings()["linuxptp"]
self.__linuxptp_observer = LinuxPtpObserver(linuxptp_settings["interface"])
def __init_flexPtp(self) -> None:
"""
Initialize flexPTP.
"""
ctrl = self.__flexPtp_controller
ctrl.disable_all_logging()
ctrl.set_domain(0)
ctrl.reset_flexptp()
ctrl.enable_log("logid", True)
def test_routine(self) -> None:
LINUXPTP_INIT_TIMEOUT = 5
BMCA_TIMEOUT = 150
SYNC_TIMEOUT = 4
SYNC_MAX_CYCLES = 100
TARGET_RANGE = 50**2
WINDOW_SIZE = 10
def flexPtp_echo(data: str) -> None:
self.__flexPtp_log_queue.put(str(data))
def linuxptp_echo(data: str) -> None:
self.__linuxptp_log_queue.put(str(data))
def both_echo(data: str) -> None:
flexPtp_echo(data)
linuxptp_echo(data)
self.__flexPtp_controller.register_log_callback(flexPtp_echo)
self.__linuxptp_observer.register_observer_callback(linuxptp_echo)
def time_to_str(t: float) -> str:
return time.strftime("%d-%m-%Y %H:%M:%S", time.gmtime(t))
ctrl = self.__flexPtp_controller
observer = self.__linuxptp_observer
tests_errored = False
tsn = 1
for test in self.__tests:
# identify correct mode or profile
id = test["id"]
mode = test["mode"]
# set status
self.set_status("[{:d}/{:d}] Test in progress: {:s}".format(tsn, len(self.__tests), test["name"]))
# setup info variables for the test
test["start"] = time.time()
str_start_time = time_to_str(test["start"])
values = (test["name"], "In-progress", str_start_time, "-", "-")
self.__setup_tab.get_test_treeview().item(test["entry"], values=values, tags=("in_progress"))
# announce the start of a new testcase
both_echo("\n\n----------\nSTARTING test: {:s} ({:s})\n start time: {:s}\n----------\n\n".format(id, mode, str_start_time))
# set mode or profile on the flexPTP device
ctrl.start_by_id(id)
# adjust device priority to force required working mode
priority1 = 128
if mode == "master":
priority1 = 100
ctrl.set_priority(priority1, 255)
# start linuxptp with the matching mode or profile
observer.start_linuxptp(id)
n = 0
var = 1E+09
dts = []
try:
# wait for the LISTENING state clearly indicating the startup of the linuxptp
event = observer.wait_for_event("bmca-statechange", LINUXPTP_INIT_TIMEOUT, { "to": "LISTENING" })
if event.get_name() != "bmca-statechange":
raise Exception("linuxptp init failed")
# turn on BMCA and default logging on the flexPTP device
ctrl.enable_log("bmca", True)
ctrl.enable_log("def", True)
# assign flexPTP and linuxptp sides
if mode == "slave":
master_side = observer
slave_side = ctrl
observer_bmca_role = "MASTER"
ctrl_bmca_role = "SLAVE"
else:
master_side = ctrl
slave_side = observer
observer_bmca_role = "SLAVE"
ctrl_bmca_role = "MASTER"
# wait for the correct BMCA states to settle in
error = False
error_cause = "none"
event = ctrl.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": ctrl_bmca_role }) # flexPTP
if event.get_name() != "bmca-statechange":
raise Exception(event.get_name())
event = observer.wait_for_event("bmca-statechange", BMCA_TIMEOUT, { "to": observer_bmca_role }) # linuxptp
if event.get_name() != "bmca-statechange":
raise Exception(event.get_name())
# wait for the synchronization to settle
while n < SYNC_MAX_CYCLES and var > TARGET_RANGE:
event = slave_side.get_event(SYNC_TIMEOUT)
# a state change with the BMCA indicates a clear error
if event.get_name() == "synclog-slave":
dts.append(event.get_args()["Dt"])
elif event.get_name() in ("bmca-statechange", "exit"):
raise Exception(event.get_name())
if n >= WINDOW_SIZE:
var = numpy.var(dts[n-10:n])
n += 1
# check variance
error = not var < TARGET_RANGE
except Exception as e:
error = True
error_cause = e.args[0]
# display linuxptp errors
linuxptp_errors = observer.get_errors()
if linuxptp_errors != "":
linuxptp_errors = "ERROR:\n" + linuxptp_errors
linuxptp_echo(linuxptp_errors)
# stop linuxptp
observer.stop_linuxptp()
# exit if some of the subordinate layers signaled an exit
if error and error_cause in ("exit", "linuxptp init failed"):
self.set_status("Tests stopped")
tests_errored = True
# save results, conclusion time and calculate duration
test["dts"] = dts
test["cycles"] = n
passed = not error
if passed:
test["result"] = "passed"
else:
if tests_errored:
test["result"] = "errored"
else:
test["result"] = "failed"
# save timestamps
test["end"] = time.time()
test["duration"] = test["end"] - test["start"]
str_end_time = time_to_str(test["end"])
str_duration = time.strftime("%H:%M:%S", time.gmtime(test["duration"]))
self.__setup_tab.get_test_treeview().item(test["entry"], values=(
test["name"], test["result"].upper(),
str_start_time,
str_end_time,
str_duration
), tags=(test["result"]))
# announce the end of the test
both_echo("\n\n----------\nCONCLUDING test: {:s} ({:s})\n result: {:s}\n start time: {:s}\n stop time: {:s}\n duration: {:s}\n----------\n\n".format(
id, mode, test["result"].upper(), str_start_time, str_end_time, str_duration))
if tests_errored:
break
tsn += 1
if not self.__user_stop:
self.stop_tests()
if not tests_errored:
self.set_status("Tests concluded")
self.__user_stop = False
def start_tests(self) -> bool:
"""
Start selected tests.
"""
self.__gather_tests()
if len(self.__tests) == 0:
self.set_status("No tests have been selected")
return False
self.__populate_tests_treeview()
self.set_status("Preparing flexPTP device")
self.__open_flexPtp_interface()
self.__init_flexPtp()
self.set_status("Starting linuxptp observer")
self.__init_linuxptp_observer()
# ----
self.set_status("Starting tests")
self.__test_thread = threading.Thread(target=self.test_routine)
self.__test_thread.start()
self.__setup_tab.set_test_state("running")
return True
def stop_tests(self) -> bool:
"""
Stop tests in progress.
"""
self.__user_stop = True
self.__linuxptp_observer.stop_linuxptp()
self.__device_interface.close()
self.__setup_tab.set_test_state("stopped")
return True