import grpc
from .test_runner_protoc import test_runner_pb2 as pb
from .test_runner_protoc import test_runner_pb2_grpc as pbgrpc
[docs]class TestRunner:
def __init__(self, port):
self._channel = grpc.insecure_channel(f'localhost:{port}')
self._stub = pbgrpc.TestRunnerServiceStub(self._channel)
return
def __del__(self):
self._channel.close()
[docs] def start_test(self, folder_name:str):
self._stub.StartTest(pb.StartTestRequest(folder_name=folder_name))
[docs] def stop_test(self):
self._stub.StopTest(pb.Empty())
[docs] def send_serial(self,port_number,serial_data):
self._stub.SendSerial(pb.SendSerialRequest(port_number=port_number, serial_data=serial_data))
[docs] def send_serial_command(self,port_number,serial_command):
self._stub.SendSerialCommand(pb.SendSerialCommandRequest(port_number=port_number, serial_command=serial_command))
[docs] def get_latest_serial(self,port_number,direction):
res = self._stub.GetLatestSerialData(pb.GetLatestSerialDataRequest(port_number=port_number,direction=direction))
return res.serialData
[docs] def wait_for_next_serial_data(self, port_number, timeout_ms):
if timeout_ms is None:
timeout_ms = -1
elif timeout_ms < 0:
raise ValueError("Cannot set up the 'timeout' 0 or less.")
elif timeout_ms == 0:
# TODO:ノンブロッキングモード実装
raise ValueError("Cannot set up the 'timeout' 0 or less.")
res = self._stub.WaitForNextSerialData(pb.WaitForNextSerialDataRequest(port_number=port_number, timeout=timeout_ms))
return res.serialData
[docs] def write_logic_rpigp10(self, board_id: str, channel_number: int, state: bool):
self._stub.WriteLogicRPiGP10(pb.WriteLogicRPiGP10Request(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_rpigp10_multi(self, board_id: str, states: list):
values = [-1 if b is None else int(b) for b in states]
self._stub.WriteLogicRPiGP10Multi(pb.WriteLogicRPiGP10MultiRequest(board_id=board_id, states=values))
[docs] def write_logic_rpigp10_bits(self, board_id: str, state: int):
self._stub.WriteLogicRPiGP10Bits(pb.WriteLogicRPiGP10BitsRequest(board_id=board_id, state=state))
[docs] def read_logic_rpigp10(self, board_id: str, channel_number: int):
res = self._stub.ReadLogicRPiGP10(pb.BoardRequest(board_id=board_id, channel_number=channel_number))
return res.state
[docs] def write_analog_amao(self, board_id: str, channel_number: int, voltage_mv: int):
self._stub.WriteAnalogAMAO(pb.WriteAnalogAMAORequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), voltage=voltage_mv))
[docs] def read_analog_rpigp40(self, board_id: str, channel_number: int):
res = self._stub.ReadAnalogRPiGP40(pb.BoardRequest(board_id=board_id, channel_number=channel_number))
return res.voltage
[docs] def write_pwm_ampio(self, board_id: str, channel_number: int, frequency_hz: int, duty_percent: float):
self._stub.WritePwmAMPIO(pb.WritePwmAMPIORequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), frequency=frequency_hz, duty=duty_percent))
[docs] def read_pwm_ampio(self, board_id: str, channel_number: int):
res = self._stub.ReadPwmAMPIO(pb.BoardRequest(board_id=board_id, channel_number=channel_number))
return res.frequency, res.duty
[docs] def read_analog_cpiai1208li(self, board_id: str, channel_number: int):
res = self._stub.ReadAnalogCPIAI1208LI(pb.BoardRequest(board_id=board_id, channel_number=channel_number))
return res.voltage
[docs] def write_logic_cpirry16(self, board_id: str, channel_number: int, state: bool):
self._stub.WriteLogicCpiRry16(pb.WriteLogicCpiRry16Request(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_cpirry16_multi(self, board_id: str, states: list):
values = [-1 if b is None else int(b) for b in states]
self._stub.WriteLogicCpiRry16Multi(pb.WriteLogicCpiRry16MultiRequest(board_id=board_id, states=values))
[docs] def write_logic_cpirry16_bits(self, board_id: str, state: int):
self._stub.WriteLogicCpiRry16Bits(pb.WriteLogicCpiRry16BitsRequest(board_id=board_id, state=state))
[docs] def write_logic_cpidio0808l(self, board_id: str, channel_number: int, state: bool):
self._stub.WriteLogicCpiDio0808L(pb.WriteLogicCpiDio0808LRequest(board=pb.BoardRequest(board_id=board_id, channel_number=channel_number), state=state))
[docs] def write_logic_cpidio0808l_multi(self, board_id: str, states: list):
values = [-1 if b is None else int(b) for b in states]
self._stub.WriteLogicCpiDio0808LMulti(pb.WriteLogicCpiDio0808LMultiRequest(board_id=board_id, states=values))
[docs] def write_logic_cpidio0808l_bits(self, board_id: str, state: int):
self._stub.WriteLogicCpiDio0808LBits(pb.WriteLogicCpiDio0808LBitsRequest(board_id=board_id, state=state))
[docs] def read_logic_cpidio0808l(self, board_id: str, channel_number: int):
res = self._stub.ReadLogicCpiDio0808L(pb.BoardRequest(board_id=board_id, channel_number=channel_number))
return res.state
[docs] def send_can(self,user_defined_name, channel, can_id, can_data):
self._stub.SendCan(pb.SendCanRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, can_data=can_data))
[docs] def send_can_message(self, user_defined_name, channel, can_message_name):
self._stub.SendCanMessage(pb.SendCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_message_name=can_message_name))
[docs] def get_latest_can(self,user_defined_name, channel, can_id, direction):
res = self._stub.GetLatestCanMessage(pb.GetLatestCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, direction=direction))
return res.can_data
[docs] def wait_for_next_can_message(self,user_defined_name, channel, can_id, timeout_ms):
if timeout_ms is None:
timeout_ms = -1
elif timeout_ms < 0:
raise ValueError("Cannot set up the 'timeout' 0 or less.")
elif timeout_ms == 0:
# TODO:ノンブロッキングモード実装
raise ValueError("Cannot set up the 'timeout' 0 or less.")
res = self._stub.WaitForNextCanMessage(pb.WaitForNextCanMessageRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, timeout=timeout_ms))
return res.can_data
[docs] def start_can_periodic(self,user_defined_name, channel, can_id, can_data, period):
self._stub.StartCanPeriodic(pb.StartCanPeriodicRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id, can_data=can_data, period=period))
[docs] def stop_can_periodic(self,user_defined_name, channel, can_id):
if can_id is None:
can_id = -1
elif can_id < 0:
raise ValueError("Cannot set up the 'can_id' less than 0.")
self._stub.StopCanPeriodic(pb.StopCanPeriodicRequest(user_defined_name=user_defined_name, channel=channel, can_id=can_id))
[docs] def start_can_message_periodic(self, user_defined_name, can_message_name):
self._stub.StartCanMessagePeriodic(pb.StartCanMessagePeriodicRequest(user_defined_name=user_defined_name, can_message_name=can_message_name))
[docs] def stop_can_message_periodic(self, user_defined_name, can_message_name):
self._stub.StopCanMessagePeriodic(pb.StopCanMessagePeriodicRequest(user_defined_name=user_defined_name, can_message_name=can_message_name))