flexPTP-test/DeviceInterface.py
András Wiesner df1d6a9f1c - initial
2026-04-23 10:34:28 +02:00

84 lines
2.7 KiB
Python

import serial
import re
class DeviceInterface:
"""
Helper class for interfacing the device running flexPTP.
"""
def __init__(self, url: str, options: dict = {}) -> None:
"""
Initialize the interface.
:param str url: url of the device (tty, socket etc.)
:param dict options: collection of several options (baudrate, bytesize, parity, stopbits etc.)
:rtype: None
"""
self.device = serial.serial_for_url(url, timeout=0.05) # open remote device
# set serial port options if applicable
if "baudrate" in options:
self.device.baudrate = options["baudrate"]
if "bytesize" in options:
self.device.bytesize = options["bytesize"]
if "parity" in options:
self.device.parity = options["parity"]
if "stopbits" in options:
self.device.stopbits = options["stopbits"]
def read_until(self, expected: bytes = serial.LF) -> bytes:
"""
Read from the device until a specific sequence is found.
:param bytes expected: delimiter sequence
:return: bytes read
:rtype: bytes
"""
return self.device.read_until(expected=expected)
def write(self, data: bytes) -> None:
"""
Write data onto the device
:param bytes data: data to transfer
"""
self.device.write(data)
def execute_command(self, cmd: str, expect_results: bool = True, separate_results: bool = False) -> str | dict[str, str] | None:
"""
Execute command on the device
:param str cmd: command to launch
:param bool expect_results: is there any return values expected and awaited?
:param bool separate_results: if True a key-value separation attempted on the results
:return: results (if requested)
:rtype: str | None
"""
self.device.write((cmd.strip("\r\n") + "\r\n").encode()) # sanitize commands
self.device.read_until(cmd.encode()) # flush echo
if expect_results: # store results if required
timeout = False
results = ""
while not timeout: # store continuous chunks
data = self.device.read(32)
if len(data) > 0:
results += data.decode()
else:
timeout = True
if separate_results: # separation requested
records = re.findall("^[ ]*([^:]+)[ ]*:[ ]*(.+)[ ]*$", results.strip(), flags=re.MULTILINE)
results = dict[str, str]()
for rec in records:
results[rec[0]] = rec[1].strip()
return results
else:
return results
else:
return None