import sys, os import select from typing import Callable import serial import re from threading import Thread, Lock class DeviceInterface: """ Helper class for interfacing the device running flexPTP. """ def reader_routine(self) -> None: """ Thread routine for receiving all incoming communication from the device. """ while not self.device.closed: try: data = self.device.readline() # read serial port line-by-line if type(data) is bytes: # remove colorization escape sequences data = re.sub(r"\x1b\[[0-9;]*m", "", data.decode()) if len(data) > 0: # write (remaining) data to the pipe os.write(self.__outpipe_rw_fd[1], data.encode()) if self.__out_cb is not None: # invoke callback if provided self.__out_cb(data) except: break # handle closing the device def __init__(self, url: str, options: dict = {}) -> None: """ Initialize the interface. :param str url: url of the device (tty, socket etc.) :param dict options: collection of several options (baudrate, bytesize, parity, stopbits etc.) :rtype: None """ self.device = serial.serial_for_url(url, timeout=None) # open remote device # set serial port options if applicable if "baudrate" in options: self.device.baudrate = options["baudrate"] if "bytesize" in options: self.device.bytesize = options["bytesize"] if "parity" in options: self.device.parity = options["parity"] if "stopbits" in options: self.device.stopbits = options["stopbits"] # ---- self.__out_cb: Callable | None = None r, w = os.pipe() self.__outpipe_rw_fd = [ r, w ] self.__outpipe_rw = [ os.fdopen(r, "rb"), os.fdopen(w, "wb") ] self.__reader_thread = Thread(target=self.reader_routine) self.__reader_thread.start() def close(self) -> None: """ Close the device interface. :rtype: None """ self.device.close() self.__reader_thread.join() self.__outpipe_rw[0].close() self.__outpipe_rw[1].close() def readline(self) -> bytes: """ Read a line from the device. :return: bytes read :rtype: bytes """ return self.__outpipe_rw[0].readline().encode() def write(self, data: bytes) -> None: """ Write data onto the device :param bytes data: data to transfer """ self.device.write(data) def execute_command(self, cmd: str, expect_results: bool = True, separate_results: bool = False) -> str | dict[str, str] | None: """ Execute command on the device :param str cmd: command to launch :param bool expect_results: is there any return values expected and awaited? :param bool separate_results: if True a key-value separation attempted on the results :return: results (if requested) :rtype: str | None """ pure_final_cmd = cmd.strip("\r\n") final_cmd = (pure_final_cmd + "\r\n").encode() # sanitize commands #self.skip(len(final_cmd)) # flush echo self.write(final_cmd) # send command # wait for the echo to arrive line = "" while line != pure_final_cmd: line = self.__outpipe_rw[0].readline().decode().strip() if expect_results: # store results if required timeout = False results = "" poller = select.poll() poller.register(self.__outpipe_rw_fd[0], select.POLLIN) while not timeout: # store continuous chunks res = poller.poll(100) if len(res) > 0 and res[0][1] == select.POLLIN: data = self.__outpipe_rw[0].read(1) if len(data) > 0: results += data.decode() else: timeout = True else: timeout = True if separate_results: # separation requested records = re.findall("^[ ]*([^:]+)[ ]*:[ ]*(.+)[ ]*$", results.strip(), flags=re.MULTILINE) results = dict[str, str]() for rec in records: results[rec[0]] = rec[1].strip() return results else: return results else: return None def register_out_callback(self, cb: Callable) -> None: """ Register callback for receiving all printed messages from the device. :param Callable cb: callback invoked at each reception :rtype: None """ self.__out_cb = cb