import threading import time from typing import Callable, Literal import numpy from LinuxPtpObserver import LinuxPtpObserver from FlexPtpController import FlexPtpController class TestController: def __test_routine(self) -> None: # controller and observer objects ctrl = self.__flexPtp_controller observer = self.__linuxptp_observer test_exited_early = False # a test has encountered an unrecoverable error ntests = len(self.__tests) # number of tests tsi = 0 # test index # iterate over all test cases for test in self.__tests: # store start time test["start"] = time.time() # set result to "in-progress" test["result"] = "in-progress" # test data arrays dts = [] vars = [] means = [] # assign to test data test["data"] = { "skip": self.__params["averaging_window_size"], "dt": dts, "var": vars, "mean": means, "min": { "global": 1E+09, "local": 1E+09 }, "max": { "global": -1E+09, "local": -1E+09 }, "cycles": 0, "rawtype": "", "raw": [] } # send the TEST_COMMENCED event announcing the processing of a new testcase self.__dispatch_event({ "type": "TEST_COMMENCED", "tsi": tsi, "ntests": ntests, "test": test }) # identify correct mode or profile id = test["id"] mode = test["mode"] # set mode or profile on the flexPTP device ctrl.start_by_id(id) # adjust device priority to force required working mode priority1 = 128 if mode == "master": priority1 = 100 ctrl.set_priority(priority1, 255) # start linuxptp with the matching mode or profile observer.start_linuxptp(id) n = 0 # sync cycle sequence number var = 1E+09 # variance mean = 1E-09 # mean # error handling error_indication = False error_cause = "none" # the testing process try: # (1) wait for the LISTENING state clearly indicating the startup of the linuxptp event = observer.wait_for_event("bmca-statechange", self.__params["linuxptp_init_timeout"], { "to": "LISTENING" }) if event.get_name() != "bmca-statechange": if event.get_name() == "exit": cause = "exit" else: cause = "linuxptp init failed" raise Exception(cause) # (2) turn on BMCA and default logging on the flexPTP device ctrl.enable_log("bmca", True) ctrl.enable_log("def", True) # (3) assign flexPTP and linuxptp sides if mode == "slave": master_side = observer slave_side = ctrl observer_bmca_role = "MASTER" ctrl_bmca_role = "SLAVE" rawtype = "flexPTP" else: master_side = ctrl slave_side = observer observer_bmca_role = "SLAVE" ctrl_bmca_role = "MASTER" rawtype = "linuxptp" # set raw data type test["data"]["rawtype"] = rawtype # (4) wait for the correct BMCA states to settle in event = ctrl.wait_for_event("bmca-statechange", self.__params["bmca_timeout"], { "to": ctrl_bmca_role }) # flexPTP if event.get_name() != "bmca-statechange": if event.get_name() == "exit": cause = "exit" else: cause = "flexPTP BMCA lock-in failed" raise Exception(cause) event = observer.wait_for_event("bmca-statechange", self.__params["bmca_timeout"], { "to": observer_bmca_role }) # linuxptp if event.get_name() != "bmca-statechange": if event.get_name() == "exit": cause = "exit" else: cause = "linuxptp BMCA lock-in failed" raise Exception(cause) # (5) wait for the synchronization to settle while n < self.__params["sync_min_cycles"] or (n < self.__params["sync_max_cycles"] and var > self.__params["target_variance"]): event = slave_side.get_event(self.__params["sync_timeout"]) # a state change with the BMCA indicates a clear error match event.get_name(): case "synclog-slave": eargs = event.get_args() dt = dts.append(eargs["Dt"]) test["data"]["raw"].append(list(eargs.values())) case "bmca-statechage": raise Exception("BMCA has switched state mid-test") case "exit": raise Exception("exit") # recalculate variance if n >= self.__params["averaging_window_size"]: window = dts[n-self.__params["averaging_window_size"]:n] var = numpy.var(window) mean = numpy.mean(window) test["data"]["min"]["local"] = numpy.min(window) test["data"]["max"]["local"] = numpy.max(window) # search min/max if dts[-1] < test["data"]["min"]["global"]: test["data"]["min"]["global"] = dts[-1] if dts[-1] > test["data"]["max"]["global"]: test["data"]["max"]["global"] = dts[-1] # always store variance and mean, even if not meaningful vars.append(var) means.append(mean) # increment index n += 1 # store cycle count test["data"]["cycles"] = n # dispatch SYNC_CYCLE event self.__dispatch_event({"type": "SYNC_CYCLE", "test": test }) # check variance if var > self.__params["target_variance"]: error_indication = True error_cause = "too high variance" # handle exceptions except Exception as e: error_indication = True error_cause = e.args[0] # display linuxptp errors linuxptp_errors = observer.get_errors() if linuxptp_errors != "": self.__dispatch_event({"type": "LINUXPTP_ERRORS", "msg": linuxptp_errors, "test": test}) # stop linuxptp observer.stop_linuxptp() # exit if some of the subordinate layers signaled an exit if error_indication and error_cause in ("exit", "linuxptp init failed"): self.__dispatch_event({"type": "TESTS_EXITED_EARLY", "cause": error_cause, "test": test}) test_exited_early = True # save results, conclusion time and calculate duration test["dts"] = dts test["cycles"] = n test["comments"] = "" # check pass/fail conditions passed = not error_indication if passed: test["result"] = "passed" else: test["comments"] = error_cause if test_exited_early: test["result"] = "errored" else: test["result"] = "failed" # save timestamps test["end"] = time.time() test["duration"] = test["end"] - test["start"] # emit the TEST_CONCLUDED event self.__dispatch_event({"type": "TEST_CONCLUDED", "test": test}) # if the tests has errored, then if test_exited_early: break # increase test index tsi += 1 # send CLEANUP_TESTS event self.__dispatch_event({ "type": "TESTS_FINISHED", "early_exit": test_exited_early }) def __init__( self, flexPtp_controller: FlexPtpController, lptp_observer: LinuxPtpObserver ) -> None: self.__flexPtp_controller = flexPtp_controller self.__linuxptp_observer = lptp_observer self.__event_callback: Callable[[dict], None] | None = None def set_event_callback(self, cb: Callable[[dict], None]) -> None: self.__event_callback = cb def __dispatch_event(self, data: dict) -> None: if self.__event_callback is not None: self.__event_callback(data) def start_tests(self, tests: list, parameters: dict) -> None: self.__tests = tests self.__params = parameters self.__test_thread = threading.Thread(target=self.__test_routine) self.__test_thread.start()