Source code for automeal.test_runner

import grpc
from .test_runner_protoc import test_runner_pb2 as pb
from .test_runner_protoc import test_runner_pb2_grpc as pbgrpc


[docs]class TestRunner: def __init__(self, port): self._channel = grpc.insecure_channel(f'localhost:{port}') self._stub = pbgrpc.TestRunnerServiceStub(self._channel) return def __del__(self): self._channel.close()
[docs] def start_test(self, folder_name:str): self._stub.StartTest(pb.StartTestRequest(folder_name=folder_name))
[docs] def stop_test(self): self._stub.StopTest(pb.Empty())
[docs] def send_serial(self,port_number,serial_data): self._stub.SendSerial(pb.SendSerialRequest(port_number=port_number, serial_data=serial_data))
[docs] def send_serial_command(self,port_number,serial_command): self._stub.SendSerialCommand(pb.SendSerialCommandRequest(port_number=port_number, serial_command=serial_command))
[docs] def get_latest_serial(self,port_number,direction): res = self._stub.GetLatestSerialData(pb.GetLatestSerialDataRequest(port_number=port_number,direction=direction)) return res.serialData
[docs] def wait_for_next_serial_data(self, port_number, timeout_ms): if timeout_ms is None: timeout_ms = -1 elif timeout_ms < 0: raise ValueError("Cannot set up the 'timeout' 0 or less.") elif timeout_ms == 0: # TODO:ノンブロッキングモード実装 raise ValueError("Cannot set up the 'timeout' 0 or less.") res = self._stub.WaitForNextSerialData(pb.WaitForNextSerialDataRequest(port_number=port_number, timeout=timeout_ms)) return res.serialData
[docs] def write_logic_rpigp10(self, board_id: str, channel_number: int, state: bool): self._stub.WriteLogicRPiGP10(pb.WriteLogicRPiGP10Request(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_rpigp10_multi(self, board_id: str, states: list): values = [-1 if b is None else int(b) for b in states] self._stub.WriteLogicRPiGP10Multi(pb.WriteLogicRPiGP10MultiRequest(board_id=board_id, states=values))
[docs] def write_logic_rpigp10_bits(self, board_id: str, state: int): self._stub.WriteLogicRPiGP10Bits(pb.WriteLogicRPiGP10BitsRequest(board_id=board_id, state=state))
[docs] def read_logic_rpigp10(self, board_id: str, channel_number: int): res = self._stub.ReadLogicRPiGP10(pb.BoardRequest(board_id=board_id, channel_number=channel_number)) return res.state
[docs] def write_analog_amao(self, board_id: str, channel_number: int, voltage_mv: int): self._stub.WriteAnalogAMAO(pb.WriteAnalogAMAORequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), voltage=voltage_mv))
[docs] def read_analog_rpigp40(self, board_id: str, channel_number: int): res = self._stub.ReadAnalogRPiGP40(pb.BoardRequest(board_id=board_id, channel_number=channel_number)) return res.voltage
[docs] def write_pwm_ampio(self, board_id: str, channel_number: int, frequency_hz: int, duty_percent: float): self._stub.WritePwmAMPIO(pb.WritePwmAMPIORequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), frequency=frequency_hz, duty=duty_percent))
[docs] def read_pwm_ampio(self, board_id: str, channel_number: int): res = self._stub.ReadPwmAMPIO(pb.BoardRequest(board_id=board_id, channel_number=channel_number)) return res.frequency, res.duty
[docs] def read_analog_cpiai1208li(self, board_id: str, channel_number: int): res = self._stub.ReadAnalogCPIAI1208LI(pb.BoardRequest(board_id=board_id, channel_number=channel_number)) return res.voltage
[docs] def write_logic_cpirry16(self, board_id: str, channel_number: int, state: bool): self._stub.WriteLogicCpiRry16(pb.WriteLogicCpiRry16Request(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_cpirry16_multi(self, board_id: str, states: list): values = [-1 if b is None else int(b) for b in states] self._stub.WriteLogicCpiRry16Multi(pb.WriteLogicCpiRry16MultiRequest(board_id=board_id, states=values))
[docs] def write_logic_cpirry16_bits(self, board_id: str, state: int): self._stub.WriteLogicCpiRry16Bits(pb.WriteLogicCpiRry16BitsRequest(board_id=board_id, state=state))
[docs] def write_logic_cpidio0808l(self, board_id: str, channel_number: int, state: bool): self._stub.WriteLogicCpiDio0808L(pb.WriteLogicCpiDio0808LRequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_cpidio0808l_multi(self, board_id: str, states: list): values = [-1 if b is None else int(b) for b in states] self._stub.WriteLogicCpiDio0808LMulti(pb.WriteLogicCpiDio0808LMultiRequest(board_id=board_id, states=values))
[docs] def write_logic_cpidio0808l_bits(self, board_id: str, state: int): self._stub.WriteLogicCpiDio0808LBits(pb.WriteLogicCpiDio0808LBitsRequest(board_id=board_id, state=state))
[docs] def read_logic_cpidio0808l(self, board_id: str, channel_number: int): res = self._stub.ReadLogicCpiDio0808L(pb.BoardRequest(board_id=board_id, channel_number=channel_number)) return res.state
[docs] def send_can(self,user_defined_name, channel, can_id, can_data): self._stub.SendCan(pb.SendCanRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, can_data=can_data))
[docs] def send_can_message(self, user_defined_name, channel, can_message_name): self._stub.SendCanMessage(pb.SendCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_message_name=can_message_name))
[docs] def get_latest_can(self,user_defined_name, channel, can_id, direction): res = self._stub.GetLatestCanMessage(pb.GetLatestCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, direction=direction)) return res.can_data
[docs] def wait_for_next_can_message(self,user_defined_name, channel, can_id, timeout_ms): if timeout_ms is None: timeout_ms = -1 elif timeout_ms < 0: raise ValueError("Cannot set up the 'timeout' 0 or less.") elif timeout_ms == 0: # TODO:ノンブロッキングモード実装 raise ValueError("Cannot set up the 'timeout' 0 or less.") res = self._stub.WaitForNextCanMessage(pb.WaitForNextCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, timeout=timeout_ms)) return res.can_data
[docs] def start_can_periodic(self,user_defined_name, channel, can_id, can_data, period): self._stub.StartCanPeriodic(pb.StartCanPeriodicRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, can_data=can_data, period=period))
[docs] def stop_can_periodic(self,user_defined_name, channel, can_id): if can_id is None: can_id = -1 elif can_id < 0: raise ValueError("Cannot set up the 'can_id' less than 0.") self._stub.StopCanPeriodic(pb.StopCanPeriodicRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id))
[docs] def start_can_message_periodic(self, user_defined_name, can_message_name): self._stub.StartCanMessagePeriodic(pb.StartCanMessagePeriodicRequest(user_defined_name=user_defined_name, can_message_name=can_message_name))
[docs] def stop_can_message_periodic(self, user_defined_name, can_message_name): self._stub.StopCanMessagePeriodic(pb.StopCanMessagePeriodicRequest(user_defined_name=user_defined_name, can_message_name=can_message_name))