flexPTP-test/LinuxPtpObserver.py
András Wiesner cdbec108cb GUI...
2026-04-27 21:47:57 +02:00

51 lines
1.7 KiB
Python

import signal
import subprocess
import select
from threading import Thread
from typing import Callable
class LinuxPtpObserver:
def __init__(self, ni: str) -> None:
self.__ni = ni
self.__process: subprocess.Popen
self.__observer_thread: Thread
self.__observer_cb: Callable | None = None
pass
def observer_routine(self) -> None:
if self.__process.stdout is not None:
pfd = select.poll()
pfd.register(self.__process.stdout.fileno(), select.POLLIN)
else:
return
while self.__process.poll() is None:
res = pfd.poll()
if len(res) > 0 and res[0][1] & select.POLLIN:
data = self.__process.stdout.readline()
if self.__observer_cb is not None:
self.__observer_cb(data)
def start_linuxptp(self, profile: str) -> None:
cmd = ["sudo", "ptp4l", "--priority1=127", "--priority2=255", "--gmCapable=1", "--neighborPropDelayThresh=100000",
"--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0",
"-i", self.__ni, "-f", "linuxptp_configs/{:s}.cfg".format(profile), "-m", "-l", "6"]
self.__process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
text=True
)
self.__observer_thread = Thread(target=self.observer_routine)
self.__observer_thread.start()
def stop_linuxptp(self) -> None:
self.__process.send_signal(signal.SIGKILL)
self.__observer_thread.join()
def register_observer_callback(self, cb : Callable) -> None:
self.__observer_cb = cb