diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ba0430d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__/
\ No newline at end of file
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 35806d7..7c57f9f 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -12,8 +12,10 @@
"program": "${workspaceFolder}/test_main.py",
"python": ".venv/bin/python",
"console": "integratedTerminal",
+ "justMyCode": false,
"args": [
"/dev/ttyACM0",
+ "enp52s0f0"
]
}
]
diff --git a/DeviceInterface.py b/DeviceInterface.py
index bee8aca..72e0d29 100644
--- a/DeviceInterface.py
+++ b/DeviceInterface.py
@@ -1,11 +1,40 @@
+import sys, os
+import select
+from typing import Callable
+
import serial
import re
+from threading import Thread, Lock
class DeviceInterface:
"""
Helper class for interfacing the device running flexPTP.
"""
+ def reader_routine(self) -> None:
+ """
+ Thread routine for receiving all incoming communication from
+ the device.
+ """
+
+ while not self.device.closed:
+ try:
+ data = self.device.readline() # read serial port line-by-line
+
+ if type(data) is bytes:
+ # remove colorization escape sequences
+ data = re.sub(r"\x1b\[[0-9;]*m", "", data.decode())
+
+ if len(data) > 0: # write (remaining) data to the pipe
+ os.write(self.__outpipe_rw_fd[1], data.encode())
+
+ if self.__out_cb is not None: # invoke callback if provided
+ self.__out_cb(data)
+
+ except:
+ break # handle closing the device
+
+
def __init__(self, url: str, options: dict = {}) -> None:
"""
Initialize the interface.
@@ -15,7 +44,7 @@ class DeviceInterface:
:rtype: None
"""
- self.device = serial.serial_for_url(url, timeout=0.05) # open remote device
+ self.device = serial.serial_for_url(url, timeout=None) # open remote device
# set serial port options if applicable
if "baudrate" in options:
@@ -27,17 +56,39 @@ class DeviceInterface:
if "stopbits" in options:
self.device.stopbits = options["stopbits"]
- def read_until(self, expected: bytes = serial.LF) -> bytes:
- """
- Read from the device until a specific sequence is found.
+ # ----
+
+ self.__out_cb: Callable | None = None
+
+ r, w = os.pipe()
+ self.__outpipe_rw_fd = [ r, w ]
+ self.__outpipe_rw = [ os.fdopen(r, "rb"), os.fdopen(w, "wb") ]
+
+ self.__reader_thread = Thread(target=self.reader_routine)
+ self.__reader_thread.start()
+
+ def close(self) -> None:
+ """
+ Close the device interface.
+
+ :rtype: None
+ """
+
+ self.device.close()
+ self.__reader_thread.join()
+ self.__outpipe_rw[0].close()
+ self.__outpipe_rw[1].close()
+
+ def readline(self) -> bytes:
+ """
+ Read a line from the device.
- :param bytes expected: delimiter sequence
:return: bytes read
:rtype: bytes
"""
- return self.device.read_until(expected=expected)
-
+ return self.__outpipe_rw[0].readline().encode()
+
def write(self, data: bytes) -> None:
"""
@@ -47,7 +98,7 @@ class DeviceInterface:
"""
self.device.write(data)
-
+
def execute_command(self, cmd: str, expect_results: bool = True, separate_results: bool = False) -> str | dict[str, str] | None:
"""
@@ -60,15 +111,32 @@ class DeviceInterface:
:rtype: str | None
"""
- self.device.write((cmd.strip("\r\n") + "\r\n").encode()) # sanitize commands
- self.device.read_until(cmd.encode()) # flush echo
+ pure_final_cmd = cmd.strip("\r\n")
+ final_cmd = (pure_final_cmd + "\r\n").encode() # sanitize commands
+ #self.skip(len(final_cmd)) # flush echo
+ self.write(final_cmd) # send command
+
+ # wait for the echo to arrive
+ line = ""
+ while line != pure_final_cmd:
+ line = self.__outpipe_rw[0].readline().decode().strip()
+
if expect_results: # store results if required
timeout = False
results = ""
+
+ poller = select.poll()
+ poller.register(self.__outpipe_rw_fd[0], select.POLLIN)
+
while not timeout: # store continuous chunks
- data = self.device.read(32)
- if len(data) > 0:
- results += data.decode()
+ res = poller.poll(100)
+
+ if len(res) > 0 and res[0][1] == select.POLLIN:
+ data = self.__outpipe_rw[0].read(1)
+ if len(data) > 0:
+ results += data.decode()
+ else:
+ timeout = True
else:
timeout = True
@@ -81,4 +149,15 @@ class DeviceInterface:
else:
return results
else:
- return None
\ No newline at end of file
+ return None
+
+ def register_out_callback(self, cb: Callable) -> None:
+ """
+ Register callback for receiving all printed messages from the
+ device.
+
+ :param Callable cb: callback invoked at each reception
+ :rtype: None
+ """
+
+ self.__out_cb = cb
\ No newline at end of file
diff --git a/FlexPtpController.py b/FlexPtpController.py
new file mode 100644
index 0000000..d90cc90
--- /dev/null
+++ b/FlexPtpController.py
@@ -0,0 +1,55 @@
+from DeviceInterface import DeviceInterface
+
+class FlexPtpController:
+ def __init__(self, di: DeviceInterface) -> None:
+ self.__di = di
+ pass
+
+ def reset_flexptp(self) -> None:
+ self.__di.execute_command("ptp reset")
+
+ def start_e2e_udp(self) -> None:
+ self.__di.execute_command("ptp profile preset default")
+
+ def start_p2p_udp(self) -> None:
+ self.__di.execute_command("ptp profile preset defp2p")
+
+ def start_e2e_l2(self) -> None:
+ self.__di.execute_command("ptp profile preset default")
+ self.__di.execute_command("ptp transport 802.3")
+ self.reset_flexptp()
+
+ def start_p2p_l2(self) -> None:
+ self.__di.execute_command("ptp profile preset defp2p")
+ self.__di.execute_command("ptp transport 802.3")
+ self.reset_flexptp()
+
+ def start_gPTP(self) -> None:
+ self.__di.execute_command("ptp profile preset gPTP")
+
+ def start_by_id(self, id: str) -> None:
+ id = id.replace("_", "").upper()
+ if id == "E2EL4":
+ self.start_e2e_udp()
+ elif id == "E2EL2":
+ self.start_e2e_l2()
+ elif id == "P2PL4":
+ self.start_p2p_udp()
+ elif id == "P2PL2":
+ self.start_p2p_l2()
+ elif id == "GPTP":
+ self.start_gPTP()
+
+ def set_priority(self, priority1: int, priority2: int) -> None:
+ self.__di.execute_command("ptp priority {:d} {:d}".format(priority1, priority2))
+
+ def set_domain(self, domain: int) -> None:
+ self.__di.execute_command("ptp domain {:d}".format(domain))
+
+ def disable_all_logging(self) -> None:
+ logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ]
+ for lt in logging_types:
+ self.__di.execute_command("ptp log " + lt + " off", expect_results=False)
+
+ def set_servo_offset(self, offset: int) -> None:
+ self.__di.execute_command("ptp servo offset {:d}".format(offset))
\ No newline at end of file
diff --git a/GUI.py b/GUI.py
new file mode 100644
index 0000000..f5b36de
--- /dev/null
+++ b/GUI.py
@@ -0,0 +1,483 @@
+import queue
+from tkinter import Tk, font, ttk
+import serial
+import serial.tools.list_ports
+import ttk_text as ttkt
+from ttkthemes import ThemedTk
+import tkinter as tk
+import netifaces
+
+from DeviceInterface import DeviceInterface
+from FlexPtpController import FlexPtpController
+from LinuxPtpObserver import LinuxPtpObserver
+
+class GUI:
+ def __init_flexPtp_options(self) -> None:
+ title = ttk.Label(self.__setup_tab, text="flexPTP options", font=self.__title_font)
+ frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title)
+ frame.grid(row=0, column=0, sticky=tk.NSEW, ipady=4, padx=4)
+ #frame.pack(anchor="nw", fill="x", padx=12, ipady=4, side=tk.LEFT, expand=True)
+
+ frame.rowconfigure([0, 1, 2, 3 ], weight=1)
+ frame.columnconfigure(0, weight=1)
+ frame.columnconfigure(1, weight=3)
+
+ self.__flexPtp_options_frame = frame
+
+ device_label = ttk.Label(frame, text="Device:")
+ device_label.grid(column=0, row=0, sticky=tk.E)
+
+ baudrate_label = ttk.Label(frame, text="Baudrate:")
+ baudrate_label.grid(column=0, row=1, sticky=tk.E)
+
+ baudrate_label = ttk.Label(frame, text="Parity:")
+ baudrate_label.grid(column=0, row=2, sticky=tk.E)
+
+ baudrate_label = ttk.Label(frame, text="Stopbits:")
+ baudrate_label.grid(column=0, row=3, sticky=tk.E)
+
+ self.__sel_device = tk.StringVar()
+ device_combobox = ttk.Combobox(frame, textvariable=self.__sel_device)
+ device_combobox["values"] = list(map(lambda p: p.device, filter(lambda p: p.subsystem == "usb", serial.tools.list_ports.comports())))
+ device_combobox.current(0)
+ device_combobox.grid(column=1, row=0, sticky=tk.EW, padx=4)
+
+ #device_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__sel_device)
+ #device_entry.grid(column=1, row=0, sticky=tk.EW, padx=4)
+
+ self.__sel_baudrate = tk.IntVar()
+ baudrate_combobox = ttk.Combobox(frame, textvariable=self.__sel_baudrate)
+ baudrate_combobox["values"] = (115200, 57600, 38400, 19200, 9600, 4800)
+ baudrate_combobox.current(0)
+ baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4)
+
+ self.__sel_parity = tk.StringVar()
+ self.__sel_parity.set(serial.PARITY_NONE)
+ parity_frame = tk.Frame(frame)
+ parity_frame.grid(column=1, row=2, sticky=tk.W)
+
+ parity_none = ttk.Radiobutton(parity_frame, text="None", variable=self.__sel_parity, value=serial.PARITY_NONE)
+ parity_none.pack(anchor=tk.W, side=tk.LEFT)
+
+ parity_even = ttk.Radiobutton(parity_frame, text="Even", variable=self.__sel_parity, value=serial.PARITY_EVEN)
+ parity_even.pack(anchor=tk.W, side=tk.LEFT)
+
+ parity_odd = ttk.Radiobutton(parity_frame, text="Odd", variable=self.__sel_parity, value=serial.PARITY_ODD)
+ parity_odd.pack(anchor=tk.W, side=tk.LEFT)
+
+ self.__sel_stopbits = tk.IntVar()
+ self.__sel_stopbits.set(1)
+ stopbits_frame = tk.Frame(frame)
+ stopbits_frame.grid(column=1, row=3, sticky=tk.W)
+
+ stopbits_one = ttk.Radiobutton(stopbits_frame, text="1", variable=self.__sel_stopbits, value=1)
+ stopbits_one.pack(anchor=tk.W, side=tk.LEFT)
+
+ stopbits_two = ttk.Radiobutton(stopbits_frame, text="2", variable=self.__sel_stopbits, value=2)
+ stopbits_two.pack(anchor=tk.W, side=tk.LEFT)
+
+
+ def __init_linuxptp_options(self) -> None:
+ title = ttk.Label(self.__setup_tab, text="linuxptp options", font=self.__title_font)
+ frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title)
+ frame.grid(row=1, rowspan=1, column=0, sticky=tk.NSEW, ipady=4, padx=4)
+ # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True)
+
+ frame.rowconfigure([0, 1, 2], weight=1)
+ # frame.rowconfigure(2, weight=100)
+ frame.columnconfigure(0, weight=1)
+ frame.columnconfigure(1, weight=3)
+
+ self.__linuxptp_options_frame = frame
+
+ path_label = ttk.Label(frame, text="Path:")
+ path_label.grid(column=0, row=0, sticky=tk.E)
+
+ if_label = ttk.Label(frame, text="Interface:")
+ if_label.grid(column=0, row=1, sticky=tk.E)
+
+ arg_label = ttk.Label(frame, text="Arguments:")
+ arg_label.grid(column=0, row=2, sticky=tk.E)
+
+ self.__linuxptp_path = tk.StringVar()
+ self.__linuxptp_path.set("/usr/sbin/ptp4l")
+ path_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_path)
+ path_entry.grid(column=1, row=0, sticky=tk.EW, padx=4)
+
+ self.__linuxptp_interface = tk.StringVar()
+ baudrate_combobox = ttk.Combobox(frame, textvariable=self.__linuxptp_interface)
+ baudrate_combobox["values"] = netifaces.interfaces()
+ baudrate_combobox.current(0)
+ baudrate_combobox.grid(column=1, row=1, sticky=tk.EW, padx=4)
+
+ self.__linuxptp_args = tk.StringVar()
+ self.__linuxptp_args.set("")
+ args_entry = ttk.Entry(frame, font=self.__console_font, textvariable=self.__linuxptp_args)
+ args_entry.grid(column=1, row=2, sticky=tk.EW, padx=4)
+
+
+ def __init_test_cases(self) -> None:
+ title = ttk.Label(self.__setup_tab, text="Test cases", font=self.__title_font)
+ frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title)
+ frame.grid(row=0, rowspan=2, column=1, sticky=tk.NSEW, ipady=4, padx=4)
+ # frame.pack(anchor="nw", fill="x", ipady=4, padx=12, side=tk.LEFT, expand=True)
+
+ self.__test_cases_frame = frame
+
+ basic_modes_frame = ttk.LabelFrame(frame, text="Basic modes")
+ basic_modes_frame.rowconfigure([0], weight=2)
+ basic_modes_frame.rowconfigure([1, 2], weight=8)
+ basic_modes_frame.columnconfigure([0], weight=2)
+ basic_modes_frame.columnconfigure([1, 2], weight=8)
+ basic_modes_frame.pack(expand=False, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4)
+
+ e2e_label = ttk.Label(basic_modes_frame, text="E2E", style="ProfileLabel.TLabel", padding=6)
+ e2e_label.grid(row=0, column=1)
+
+ p2p_label = ttk.Label(basic_modes_frame, text="P2P", style="ProfileLabel.TLabel")
+ p2p_label.grid(row=0, column=2)
+
+ l4_label = ttk.Label(basic_modes_frame, text="L4", style="ProfileLabel.TLabel", padding=10)
+ l4_label.grid(row=1, column=0)
+
+ l2_label = ttk.Label(basic_modes_frame, text="L2", style="ProfileLabel.TLabel")
+ l2_label.grid(row=2, column=0)
+
+ modes = {
+ "e2e_l4": {
+ "row": 1,
+ "col": 1,
+ },
+ "e2e_l2": {
+ "row": 2,
+ "col": 1,
+ },
+ "p2p_l4": {
+ "row": 1,
+ "col": 2,
+ },
+ "p2p_l2": {
+ "row": 2,
+ "col": 2,
+ },
+ }
+
+ self.__test_cases = {}
+
+ for name, pos in modes.items():
+ oframe = ttk.Frame(basic_modes_frame, width=100, height=100, style="ProfileSquare.TFrame")
+ oframe.grid(row=pos["row"], column=pos["col"], sticky=tk.NSEW)
+ oframe.pack_propagate(False)
+
+ iframe = tk.Frame(oframe)
+ iframe.place(relx=0.5, rely=0.5, anchor=tk.CENTER)
+
+ m = tk.BooleanVar()
+ m_chk = ttk.Checkbutton(iframe, text="Master", variable=m, style="ProfileChkBox.TCheckbutton")
+ m_chk.pack(anchor=tk.CENTER, side=tk.TOP)
+
+ s = tk.BooleanVar()
+ s_chk = ttk.Checkbutton(iframe, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton")
+ s_chk.pack(anchor=tk.CENTER, side=tk.BOTTOM)
+
+ self.__test_cases[name] = {
+ "type": "general",
+ "delmech": name[0:3].upper(),
+ "layer": name[4:6].upper(),
+ "master": m,
+ "slave": s
+ }
+
+ defined_profiles_frame = ttk.LabelFrame(frame, text="Defined profiles")
+ defined_profiles_frame.pack(expand=True, fill="both", anchor=tk.W, side=tk.LEFT, padx=4, pady=4)
+
+ defined_profiles = [ "gPTP" ]
+
+ ri = 0
+ for dprof in defined_profiles:
+ label = ttk.Label(defined_profiles_frame, text=dprof + ":", font=self.__profile_font)
+ label.grid(row=ri, column=0, padx=6)
+
+ m = tk.BooleanVar()
+ m_chk = ttk.Checkbutton(defined_profiles_frame, text="Master", variable=m, style="ProfileChkBox.TCheckbutton")
+ m_chk.grid(row=ri, column=1)
+
+ s = tk.BooleanVar()
+ s_chk = ttk.Checkbutton(defined_profiles_frame, text="Slave ", variable=s, style="ProfileChkBox.TCheckbutton")
+ s_chk.grid(row=ri, column=2)
+
+ self.__test_cases[dprof] = {
+ "type": "defined",
+ "master": m,
+ "slave": s
+ }
+
+
+ def __init_test_controller(self) -> None:
+ title = ttk.Label(self.__setup_tab, text="Test controls", font=self.__title_font)
+ frame = ttk.LabelFrame(self.__setup_tab, labelwidget=title)
+ frame.grid(row=2, column=0, columnspan=2, sticky=tk.NSEW, ipady=4, padx=4)
+
+ self.__tests_running = False
+
+ def start_stop_tests() -> None:
+ self.__start_stop_btn.configure(state="disabled")
+
+ if not self.__tests_running:
+ self.start_tests()
+ else:
+ self.stop_tests()
+
+ self.__tests_running = not self.__tests_running
+
+ if self.__tests_running:
+ self.__start_stop_btn.configure(text="STOP")
+ state = "disabled"
+ else:
+ self.__start_stop_btn.configure(text="START")
+ state = "normal"
+
+ frames = [ self.__flexPtp_options_frame, self.__linuxptp_options_frame, self.__test_cases_frame ]
+
+ def set_widget_state(parent: tk.Widget | tk.Toplevel) -> None:
+ for widget in parent.winfo_children():
+ if len(widget.winfo_children()) == 0 and widget.widgetName != "frame":
+ widget.configure(state=state) # type: ignore
+ else:
+ set_widget_state(widget)
+
+ for frame in frames:
+ set_widget_state(frame)
+
+ self.__start_stop_btn.configure(state="enabled")
+
+ self.__start_stop_btn = ttk.Button(frame, text="START", command=start_stop_tests)
+ self.__start_stop_btn.pack()
+
+ tw = ttk.Treeview(frame, columns=("name", "result", "start", "end", "duration"), selectmode="none")
+ tw.heading("#0", text="#")
+ tw.heading("name", text="Name")
+ tw.heading("result", text="Result")
+ tw.heading("start", text="Start")
+ tw.heading("end", text="End")
+ tw.heading("duration", text="Duration")
+
+ tw.tag_configure("passed", background="LawnGreen")
+ tw.tag_configure("failed", background="Salmon")
+ tw.tag_configure("in_progress", background="Gold")
+ tw.tag_configure("pending", font=("TkDefaultFont", 9, "italic"), foreground="Gray")
+
+ tw.pack()
+
+ self.__test_tw = tw
+
+
+ def __init_setup_tab(self) -> None:
+ self.__setup_tab = ttk.Frame(self.__tabs)
+ self.__setup_tab.columnconfigure(0, weight=1)
+ self.__setup_tab.columnconfigure(1, weight=4)
+ self.__setup_tab.rowconfigure([0, 1], weight=1)
+ self.__setup_tab.rowconfigure(2, weight=100)
+ self.__tabs.add(self.__setup_tab, text="Setup")
+
+ self.__init_flexPtp_options()
+ self.__init_linuxptp_options()
+ self.__init_test_cases()
+ self.__init_test_controller()
+
+
+ def __init_logtab(self) -> None:
+ self.__logtab = ttk.Frame(self.__tabs)
+ self.__tabs.add(self.__logtab, text="Logs")
+
+ self.__logtab.rowconfigure(0, weight=8)
+ self.__logtab.rowconfigure(1, weight=2)
+
+ self.__logtab.columnconfigure(0, weight=1)
+ self.__logtab.columnconfigure(1, weight=1)
+
+ terminal_settings = { "wrap": tk.WORD, "background": "#3D3D3D", "borderwidth": 0, "foreground": "white", "blockcursor": True, "insertbackground": "white"}
+ self.__flexPtp_log = tk.Text(self.__logtab, **terminal_settings)
+ self.__flexPtp_log.grid(column=0, row=0, sticky=tk.NSEW)
+ self.__flexPtp_log.configure(font=self.__console_font, state="disabled")
+
+ self.__linuxptp_log = tk.Text(self.__logtab, **terminal_settings)
+ self.__linuxptp_log.grid(column=1, row=0, sticky=tk.NSEW, pady=0)
+ self.__linuxptp_log.configure(font=self.__console_font, state="disabled")
+
+ self.__flexPtp_log_queue = queue.Queue()
+ self.__linuxptp_log_queue = queue.Queue()
+
+
+ def __init_test_results(self) -> None:
+ self.__results_tabs = ttk.Frame(self.__tabs)
+ self.__tabs.add(self.__results_tabs, text="Results")
+
+
+ def __init_tabs(self) -> None:
+ self.__init_setup_tab()
+ self.__init_logtab()
+ self.__init_test_results()
+
+
+ def __init_fonts(self) -> None:
+ self.__console_font = font.Font(family="Ubuntu Mono", size=9)
+ self.__title_font=font.Font(family="Ubuntu", size=14)
+ self.__mode_font=font.Font(family="Ubuntu", size=12, weight="bold")
+ self.__profile_font=font.Font(family="Ubuntu", size=12, slant="italic")
+
+
+ def __init_styles(self) -> None:
+ style = ttk.Style()
+ style.configure("ProfileChkBox.TCheckbutton", font=self.__console_font)
+ style.configure("ProfileSquare.TFrame", bordercolor="#003039", borderwidth=1, relief="solid")
+ style.configure("ProfileLabel.TLabel", font=self.__mode_font)
+
+
+ def __init_print_polls(self) -> None:
+ def print_logs() -> None:
+ while not self.__flexPtp_log_queue.empty():
+ self.__flexPtp_log.configure(state="normal")
+ self.__flexPtp_log.insert(tk.END, self.__flexPtp_log_queue.get())
+ self.__flexPtp_log.configure(state="disabled")
+
+ while not self.__linuxptp_log_queue.empty():
+ self.__linuxptp_log.configure(state="normal")
+ self.__linuxptp_log.insert(tk.END, self.__linuxptp_log_queue.get())
+ self.__linuxptp_log.configure(state="disabled")
+
+ self.__win.after(50, print_logs)
+
+ self.__win.after(50, print_logs)
+
+
+ def __init__(self) -> None:
+ self.__win = ThemedTk(theme="arc")
+ #self.__win.configure()
+ self.__win.title("flexPTP test suite")
+ self.__win.iconphoto(False, tk.PhotoImage(file="media/flexPTP_test.png"))
+
+ self.__init_fonts()
+ self.__init_styles()
+
+ self.__tabs = ttk.Notebook(self.__win)
+ self.__tabs.configure(width=1000, height=600)
+
+ self.__init_tabs()
+
+ self.__tabs.pack(expand=True, fill="both")
+
+ self.__init_print_polls()
+
+
+ def mainloop(self) -> None:
+ self.__win.mainloop()
+
+
+ def __gather_tests(self) -> None:
+ # gather test cases
+ self.__tests = []
+ for name, case in self.__test_cases.items():
+ ptype = case["type"]
+
+ template = {}
+ if ptype == "general":
+ template = { "type": ptype, "name": case["delmech"] + " " + case["layer"], "delmech": case["delmech"], "layer": case["layer"] }
+ elif ptype == "defined":
+ template = { "type": ptype, "name": name }
+
+ if case["master"].get():
+ master_mode = dict(template)
+ master_mode["mode"] = "master"
+ master_mode["name"] += " (master)"
+ self.__tests.append(master_mode)
+
+ if case["slave"].get():
+ slave_mode = dict(template)
+ slave_mode["mode"] = "slave"
+ slave_mode["name"] += " (slave)"
+ self.__tests.append(slave_mode)
+
+
+ def __populate_tests_treeview(self) -> None:
+ tw = self.__test_tw
+ items = tw.get_children()
+ if items != ():
+ tw.delete(*items)
+
+ seqnum = 1
+ n = len(self.__tests)
+ for test in self.__tests:
+ test["entry"] = tw.insert("", tk.END, text=str(seqnum) + "/" + str(n), values=(test["name"], "?", "Pending", "-", "-"), tags=("pending"))
+ seqnum += 1
+
+
+ def __open_flexPtp_interface(self) -> None:
+ url = self.__sel_device.get()
+ opts = {
+ "baudrate": self.__sel_baudrate.get(),
+ "parity": self.__sel_parity.get(),
+ "stopbits": self.__sel_stopbits.get()
+ }
+
+ def echo(data: str) -> None:
+ self.__flexPtp_log_queue.put(data.replace("\r", ""))
+
+ self.__device_interface = DeviceInterface(url, opts)
+ self.__device_interface.register_out_callback(echo)
+ self.__flexPtp_controller = FlexPtpController(self.__device_interface)
+
+
+ def __init_linuxptp_observer(self) -> None:
+ self.__linuxptp_observer = LinuxPtpObserver(self.__linuxptp_interface.get())
+
+ def echo(data) -> None:
+ self.__linuxptp_log_queue.put(str(data))
+
+ self.__linuxptp_observer.register_observer_callback(echo)
+
+
+ def __init_flexPtp(self) -> None:
+ ctrl = self.__flexPtp_controller
+
+ ctrl.disable_all_logging()
+ ctrl.set_domain(0)
+ ctrl.reset_flexptp()
+
+
+ def start_tests(self) -> None:
+ self.__gather_tests()
+ self.__populate_tests_treeview()
+ self.__open_flexPtp_interface()
+ self.__init_flexPtp()
+ self.__init_linuxptp_observer()
+
+ # ----
+
+ ctrl = self.__flexPtp_controller
+ observer = self.__linuxptp_observer
+ for test in self.__tests:
+ if test["type"] == "general":
+ id = test["delmech"] + "_" + test["layer"]
+ else:
+ id = test["name"]
+
+ ctrl.start_by_id(id)
+
+ priority1 = 128
+ if test["mode"] == "master":
+ priority1 = 100
+
+ ctrl.set_priority(priority1, 255)
+
+ observer.start_linuxptp(id)
+
+
+
+ for test in self.__tests:
+ pass
+
+
+ def stop_tests(self) -> None:
+ self.__linuxptp_observer.stop_linuxptp()
+ self.__device_interface.close()
\ No newline at end of file
diff --git a/LinuxPtpController.py b/LinuxPtpController.py
deleted file mode 100644
index b87f5af..0000000
--- a/LinuxPtpController.py
+++ /dev/null
@@ -1,4 +0,0 @@
-
-class LinuxPtpObserver:
- def __init__(self) -> None:
- pass
\ No newline at end of file
diff --git a/LinuxPtpObserver.py b/LinuxPtpObserver.py
new file mode 100644
index 0000000..585b2a6
--- /dev/null
+++ b/LinuxPtpObserver.py
@@ -0,0 +1,51 @@
+import signal
+import subprocess
+import select
+from threading import Thread
+from typing import Callable
+
+class LinuxPtpObserver:
+ def __init__(self, ni: str) -> None:
+ self.__ni = ni
+ self.__process: subprocess.Popen
+ self.__observer_thread: Thread
+ self.__observer_cb: Callable | None = None
+ pass
+
+ def observer_routine(self) -> None:
+ if self.__process.stdout is not None:
+ pfd = select.poll()
+ pfd.register(self.__process.stdout.fileno(), select.POLLIN)
+ else:
+ return
+
+ while self.__process.poll() is None:
+ res = pfd.poll()
+
+ if len(res) > 0 and res[0][1] & select.POLLIN:
+ data = self.__process.stdout.readline()
+ if self.__observer_cb is not None:
+ self.__observer_cb(data)
+
+
+ def start_linuxptp(self, profile: str) -> None:
+ cmd = ["sudo", "ptp4l", "--priority1=127", "--priority2=255", "--gmCapable=1", "--neighborPropDelayThresh=100000",
+ "--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0",
+ "-i", self.__ni, "-f", "linuxptp_configs/{:s}.cfg".format(profile), "-m", "-l", "6"]
+
+ self.__process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ text=True
+ )
+
+ self.__observer_thread = Thread(target=self.observer_routine)
+ self.__observer_thread.start()
+
+ def stop_linuxptp(self) -> None:
+ self.__process.send_signal(signal.SIGKILL)
+ self.__observer_thread.join()
+
+ def register_observer_callback(self, cb : Callable) -> None:
+ self.__observer_cb = cb
\ No newline at end of file
diff --git a/TestController.py b/TestController.py
index 2fc35e2..3a301c7 100644
--- a/TestController.py
+++ b/TestController.py
@@ -1,45 +1,40 @@
from DeviceInterface import DeviceInterface
-from LinuxPtpController import LinuxPtpObserver
+from LinuxPtpObserver import LinuxPtpObserver
+from FlexPtpController import FlexPtpController
class TestController:
- def __reset_flexptp(self) -> None:
- self.__di.execute_command("ptp reset")
-
- def __start_e2e_l4(self) -> None:
- self.__di.execute_command("ptp profile preset default")
-
- def __start_p2p_l4(self) -> None:
- self.__di.execute_command("ptp profile preset defp2p")
-
- def __start_e2e_l2(self) -> None:
- self.__di.execute_command("ptp profile preset default")
- self.__di.execute_command("ptp transport 802.3")
- self.__reset_flexptp()
-
- def __start_p2p_l2(self) -> None:
- self.__di.execute_command("ptp profile preset defp2p")
- self.__di.execute_command("ptp transport 802.3")
- self.__reset_flexptp()
-
- def __start_gPTP(self) -> None:
- self.__di.execute_command("ptp profile preset gPTP")
-
- def __set_priority(self, priority1: int, priority2: int) -> None:
- self.__di.execute_command("ptp priority {:d} {:d}".format(priority1, priority2))
-
- def __set_domain(self, domain: int) -> None:
- self.__di.execute_command("ptp domain {:d}".format(domain))
-
- def __disable_all_logging(self) -> None:
- logging_types = [ "def", "corr", "ts", "info", "locked", "bmca" ]
- for lt in logging_types:
- self.__di.execute_command("ptp log " + lt + " off", expect_results=False)
-
- def __set_servo_offset(self, offset: int) -> None:
- self.__di.execute_command("ptp servo offset {:d}".format(offset))
-
+ def __prepare_device(self) -> None:
+ self.__fptp_controller.disable_all_logging()
+ self.__fptp_controller.set_priority(128, 255)
+ self.__fptp_controller.set_domain(0)
+ self.__fptp_controller.reset_flexptp()
def __init__(self, di: DeviceInterface, lptp_observer: LinuxPtpObserver) -> None:
self.__di = di
+ self.__fptp_controller = FlexPtpController(di)
self.__lptp_observer = lptp_observer
- pass
\ No newline at end of file
+ pass
+
+ def start_test_e2e_udp(self) -> None:
+ self.__fptp_controller.start_e2e_udp()
+ self.__lptp_observer.start_linuxptp("E2E_UDP")
+
+ def start_test_e2e_l2(self) -> None:
+ self.__fptp_controller.start_e2e_l2()
+ self.__lptp_observer.start_linuxptp("E2E_L2")
+
+ def start_test_p2p_udp(self) -> None:
+ self.__fptp_controller.start_p2p_udp()
+ self.__lptp_observer.start_linuxptp("P2P_UDP")
+
+ def start_test_p2p_l2(self) -> None:
+ self.__fptp_controller.start_p2p_l2()
+ self.__lptp_observer.start_linuxptp("P2P_L2")
+
+ def start_test_gPTP(self) -> None:
+ self.__fptp_controller.start_gPTP()
+ self.__lptp_observer.start_linuxptp("gPTP")
+
+ def stop_test(self) -> None:
+ self.__lptp_observer.stop_linuxptp()
+
diff --git a/__pycache__/DeviceInterface.cpython-313.pyc b/__pycache__/DeviceInterface.cpython-313.pyc
deleted file mode 100644
index c4354e3..0000000
Binary files a/__pycache__/DeviceInterface.cpython-313.pyc and /dev/null differ
diff --git a/linuxptp_configs/E2E_UDP.cfg b/linuxptp_configs/E2E_L4.cfg
similarity index 100%
rename from linuxptp_configs/E2E_UDP.cfg
rename to linuxptp_configs/E2E_L4.cfg
diff --git a/linuxptp_configs/P2P_UDP.cfg b/linuxptp_configs/P2P_L4.cfg
similarity index 100%
rename from linuxptp_configs/P2P_UDP.cfg
rename to linuxptp_configs/P2P_L4.cfg
diff --git a/media/flexPTP_test.png b/media/flexPTP_test.png
new file mode 100644
index 0000000..1de16fc
Binary files /dev/null and b/media/flexPTP_test.png differ
diff --git a/media/flexPTP_test.svg b/media/flexPTP_test.svg
new file mode 100644
index 0000000..2ae72c2
--- /dev/null
+++ b/media/flexPTP_test.svg
@@ -0,0 +1,122 @@
+
+
+
+
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..95f4677
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,9 @@
+future==1.0.0
+iso8601==2.1.0
+netifaces==0.11.0
+netinterfaces==0.1.0
+pillow==12.2.0
+pyserial==3.5
+PyYAML==6.0.3
+ttk-text==0.3.3
+ttkthemes==3.3.0
diff --git a/start_linuxptp.sh b/start_linuxptp.sh
index a6fb8d4..6f7833f 100755
--- a/start_linuxptp.sh
+++ b/start_linuxptp.sh
@@ -1,5 +1,5 @@
#!/bin/bash
-COMMON_FLAGS=--priority1=127 --priority2=255 --gmCapable=1 --neighborPropDelayThresh=100000 --min_neighbor_prop_delay=-20000000 --assume_two_step=1 --ptp_minor_version=0
+COMMON_FLAGS="--priority1=127 --priority2=255 --gmCapable=1 --neighborPropDelayThresh=100000 --min_neighbor_prop_delay=-20000000 --assume_two_step=1 --ptp_minor_version=0"
-ptp4l -i "$1" -f "linuxptp_configs/$2.cfg" -m -l 6 $COMMON_FLAGS
\ No newline at end of file
+sudo ptp4l -i "$1" -f "linuxptp_configs/$2.cfg" -m -l 6 $COMMON_FLAGS
\ No newline at end of file
diff --git a/test_main.py b/test_main.py
index 0b687c8..8f2d80c 100644
--- a/test_main.py
+++ b/test_main.py
@@ -1,12 +1,23 @@
#!/usr/bin/python3
import optparse
+import os
+import time
import DeviceInterface
+from LinuxPtpObserver import LinuxPtpObserver
+from TestController import TestController
+from GUI import GUI
# ----------------------------------
-usage = " [-s ]"
+gui = GUI()
+
+gui.mainloop()
+
+exit(0)
+
+usage = " [-s ]"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-s", dest="baudrate", default=115200)
@@ -23,10 +34,33 @@ di = DeviceInterface.DeviceInterface(url=args[0], options={"baudrate": opts.baud
#results = di.execute_command("osinfo")
#print(results)
+def echo(str: str) -> None:
+ print(str, end="")
+
+di.register_out_callback(echo)
+
# get device clock identity
OWN_CLOCK_ID_KEY = "Own clock ID"
clock_id = di.execute_command("ptp info", separate_results=True)
-if type(clock_id) == dict[str, str] and OWN_CLOCK_ID_KEY in clock_id:
+if type(clock_id) == dict and OWN_CLOCK_ID_KEY in clock_id:
print("Own clock ID:", clock_id[OWN_CLOCK_ID_KEY])
else:
- print("Could not retrieve device clock ID!")
\ No newline at end of file
+ print("Could not retrieve device clock ID!")
+
+lptp_observer = LinuxPtpObserver(args[1])
+lptp_observer.register_observer_callback(echo)
+
+tc = TestController(di, lptp_observer)
+
+tc.start_test_gPTP()
+
+time.sleep(15)
+
+tc.stop_test()
+tc.start_test_e2e_udp()
+
+time.sleep(15)
+
+tc.stop_test()
+
+di.close()
\ No newline at end of file