import signal import subprocess import select from threading import Thread from typing import Callable class LinuxPtpObserver: def __init__(self, ni: str) -> None: self.__ni = ni self.__process: subprocess.Popen self.__observer_thread: Thread self.__observer_cb: Callable | None = None pass def observer_routine(self) -> None: if self.__process.stdout is not None: pfd = select.poll() pfd.register(self.__process.stdout.fileno(), select.POLLIN) else: return while self.__process.poll() is None: res = pfd.poll() if len(res) > 0 and res[0][1] & select.POLLIN: data = self.__process.stdout.readline() if self.__observer_cb is not None: self.__observer_cb(data) def start_linuxptp(self, profile: str) -> None: cmd = ["sudo", "ptp4l", "--priority1=127", "--priority2=255", "--gmCapable=1", "--neighborPropDelayThresh=100000", "--min_neighbor_prop_delay=-20000000", "--assume_two_step=1", "--ptp_minor_version=0", "-i", self.__ni, "-f", "linuxptp_configs/{:s}.cfg".format(profile), "-m", "-l", "6"] self.__process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, text=True ) self.__observer_thread = Thread(target=self.observer_routine) self.__observer_thread.start() def stop_linuxptp(self) -> None: self.__process.send_signal(signal.SIGKILL) self.__observer_thread.join() def register_observer_callback(self, cb : Callable) -> None: self.__observer_cb = cb