#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import grpc
import sys
import io
from . import test_runner
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
port = os.getenv('AM_GRPC_PORT')
if port:
runner = test_runner.TestRunner(port)
else:
runner = test_runner.TestRunner('3000') #デフォルト3000
[docs]def start_test(folder_name: str):
"""レポートを保存するフォルダを作成し、テストを開始します。
作成されるフォルダの名前は "TestResult_<folder_name>" の形式となります。
引数 folder_name に指定する文字列はディレクトリパスではありません。
先頭にスラッシュ "/" や末尾に拡張子 ".*" を含む文字列を指定しないでください。
フォルダはプロジェクトルート (プロジェクトファイルのあるフォルダ) の、
Reports/TestRun_yyyymmdd_hhmmss_<8 桁のランダム文字列>/TestCase_<id>_yyyymmdd_hhmmss 以下に作成されます。
この関数はテストが開始されている状態で呼び出さないでください。
:param folder_name: 任意のフォルダ名
:type folder_name: str
"""
return runner.start_test(folder_name)
[docs]def stop_test():
"""テストを終了し、各種レポートやログファイルを start_test() 呼び出しにより
作成されたフォルダに保存します。
この関数を呼び出す前に、start_test() を呼び出している必要があります。
"""
return runner.stop_test()
[docs]def send_serial(port_id,serial_data):
"""指定したシリアルポートにデータを送信します。
:param port_id: ポートID
:type port_id: str
:param serial_data: 送信データ 文字列またはバイト配列で指定
:type serial_data: str or bytes
:example: automeal.send_serial("Port1", "Send Init") # "Send Init" の ASCII コードを送信
:example: automeal.send_serial("Port1", [0x01, 0x02, 0x03])
"""
if type(serial_data)==str :
return runner.send_serial(port_id,serial_data.encode('utf-8'))
else :
return runner.send_serial(port_id,bytes(serial_data))
[docs]def send_serial_command(port_id,serial_command):
"""指定したシリアルポートにコマンド名と一致するデータを送信します。
:param port_id: ポートID
:type port_id: str
:param serial_command: 送信するデータのコマンド名(シリアル送信パネルのID列)
:type serial_command: str
:example: automeal.send_serial_command("Port1", "Init") # 定義済みの Init コマンドを送信
"""
return runner.send_serial_command(port_id,serial_command)
[docs]def get_latest_serial(port_id,direction):
"""シリアルの受信または送信した最新のデータ1つを取得します。
:param port_id: ポートID
:type port_id: str
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_serial("Port1", "R") # Port1 で最後に受信したデータ取得
"""
return runner.get_latest_serial(port_id,direction)
[docs]def wait_for_next_serial_data(port_id, timeout_ms = None):
"""指定したポートで次にデータを受信するまで待機し、受信したデータを取得します。
すでに待機状態のポートを指定した場合は例外になります。
待機中にテストを終了した等ポートが切断された場合は例外になります。
指定したタイムアウト秒数内でデータを受信しなかった場合は例外になります。
:param port_id: ポート番号
:type port_id: str
:param timeout_ms: タイムアウトまでの時間(ms)指定。1以上の整数で指定して下さい。指定なし/Noneの場合無限待ちを行います
:type direction: int
:return: 指定したポートの受信データ
:rtype: bytes
:raises ValueError: timeout_msに0または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_serial_data("Port1", 5000) # Port1 で次に受信するまで待機。5秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_serial_data(port_id, timeout_ms)
[docs]def write_logic_rpigp10(board_id: str, channel_number: int, state: bool):
"""RPi-GP10 の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param state: チャンネルの状態 Hi(True)/Lo(False)
:example: automeal.write_logic_rpigp10("Board1", 1, True) # Board1 の ch1 を Hi に
"""
return runner.write_logic_rpigp10(board_id=board_id, channel_number=channel_number, state=state)
[docs]def write_logic_rpigp10_multi(board_id: str, states: list):
"""RPi-GP10 の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 8ch 分の出力状態値 Hi(True)/Lo(False)/更新なし(None)
:type state: list[bool]
:example: automeal.write_logic_rpigp10_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を Hi, ch5,ch6 を Lo, それ以外は現状維持
8ch未満の場合、未指定分は None として扱われます。
8chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_rpigp10_multi(board_id=board_id, states=states)
[docs]def write_logic_rpigp10_bits(board_id: str, state: int):
"""RPi-GP10 の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 8ch 分の出力状態値 (0x00-0xFF)(Hi=1/Lo=0)
:type state: int
:example: automeal.write_logic_rpigp10_bits("Board1", 0xFF) # Board1 8ch 全てを Hi に
"""
return runner.write_logic_rpigp10_bits(board_id=board_id, state=state)
[docs]def read_logic_rpigp10(board_id: str, channel_number: int):
"""RPi-GP10 の最新の計測値 (Hi/Lo) を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの状態 Hi(True)/Lo(False)
:rtype: bool
:example: state = automeal.read_logic_rpigp10("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_logic_rpigp10(board_id=board_id, channel_number=channel_number)
[docs]def write_analog_amao(board_id: str, channel_number: int, voltage_mv: int):
"""AM-AO の出力電圧値を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param voltage_mv: 電圧値 [mV] (0-基準電圧)。 上限は TestUnit 設定で設定している基準電圧によって変化します。
:type voltage_mv: int
:example: automeal.write_analog_amao("Board1", 1, 5000) # Board1 の ch1 を 5000 mV に
"""
return runner.write_analog_amao(board_id=board_id, channel_number=channel_number, voltage_mv=voltage_mv)
[docs]def read_analog_rpigp40(board_id: str, channel_number: int):
"""RPi-GP40 の最新の計測電圧値 [mV] を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの電圧値 [mV]
:rtype: int
"""
return runner.read_analog_rpigp40(board_id=board_id, channel_number=channel_number)
[docs]def write_pwm_ampio(board_id: str, channel_number: int, frequency_hz: int, duty_percent: float):
"""AM-PIO の出力 PWM 信号の周波数とデューティー比を更新します。
`duty_percent` はプロジェクトの設定によって分解能以下は切り捨てられます。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-4)
:type channel_number: int
:param frequency_hz: 周波数 [Hz] (0-1,000,000)。 上限は TestUnit 設定で設定している最大周波数によって変化します。
:type frequency_hz: int
:param duty_percent: デューティー比 [%] (0.0-99.9)。 TestUnit 設定で設定している最大周波数 (Duty 分解能) によって値が切り捨てられます。
:type duty_percent: float
:example: automeal.write_pwm_ampio("Board1", 1, 10000, 50) # Board1 の ch1 を 10,000 Hz, 50 % に
"""
return runner.write_pwm_ampio(board_id=board_id, channel_number=channel_number, frequency_hz=frequency_hz, duty_percent=duty_percent)
[docs]def read_pwm_ampio(board_id: str, channel_number: int):
"""AM-PIO の最新の計測 PWM 信号の周波数 [Hz] とデューティー比 [%] を取得します。
指定した CE ピン番号と接続された AM-PIO の指定チャンネルの PWM 信号 (周波数、デューティー比) を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-16)
:type channel_number: int
:return: 指定したチャンネルの周波数 [Hz] とデューティー比 [%]
:rtype: (int, float)
:example: freq, duty = automeal.read_pwm_ampio("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_pwm_ampio(board_id=board_id, channel_number=channel_number)
[docs]def read_analog_cpiai1208li(board_id: str, channel_number: int):
"""CPI-AI-1208LI の最新の計測電圧値 [mV] を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの電圧値 [mV]
:rtype: int
:example: voltage = automeal.read_analog_cpiai1208li("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_analog_cpiai1208li(board_id=board_id, channel_number=channel_number)
[docs]def write_logic_cpirry16(board_id: str, channel_number: int, state: bool):
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-16)
:type channel_number: int
:param state: チャンネルの状態 ON(True)/OFF(False)
:example: automeal.write_logic_cpirry16("Board1", 1, True) # Board1 の ch1 を ON に
"""
return runner.write_logic_cpirry16(board_id=board_id, channel_number=channel_number, state=state)
[docs]def write_logic_cpirry16_multi(board_id: str, states: list):
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 16ch 分の出力状態値 ON(True)/OFF(False)/更新なし(None)
:type state: list[bool]
:example: automeal.write_logic_cpirry16_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を ON, ch5,ch6 を OFF, それ以外は現状維持
16ch未満の場合、未指定分は None として扱われます。
16chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_cpirry16_multi(board_id=board_id, states=states)
[docs]def write_logic_cpirry16_bits(board_id: str, state: int):
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 16ch 分の出力状態値 (0x0000-0xFFFF)(ON=1/OFF=0)
:type state: int
:example: automeal.write_logic_cpirry16_bits("Board1", 0xFFFF) # Board1 16ch 全てを ON に
"""
return runner.write_logic_cpirry16_bits(board_id=board_id, state=state)
[docs]def write_logic_cpidio0808l(board_id: str, channel_number: int, state: bool):
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param state: チャンネルの状態 Hi(True)/Lo(False)
:example: automeal.write_logic_cpidio0808l("Board1", 1, True) # Board1 の ch1 を Hi に
"""
return runner.write_logic_cpidio0808l(board_id=board_id, channel_number=channel_number, state=state)
[docs]def write_logic_cpidio0808l_multi(board_id: str, states: list):
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 8ch 分の出力状態値 Hi(True)/Lo(False)/更新なし(None)
:type state: list[bool]
:example: automeal.write_logic_cpidio0808l_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を Hi, ch5,ch6 を Lo, それ以外は現状維持
8ch未満の場合、未指定分は None として扱われます。
8chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_cpidio0808l_multi(board_id=board_id, states=states)
[docs]def write_logic_cpidio0808l_bits(board_id: str, state: int):
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 8ch 分の出力状態値 (0x00-0xFF)(Hi=1/Lo=0)
:type state: int
:example: automeal.write_logic_cpidio0808l_bits("Board1", 0xFF) # Board1 8ch 全てを Hi に
"""
return runner.write_logic_cpidio0808l_bits(board_id=board_id, state=state)
[docs]def read_logic_cpidio0808l(board_id: str, channel_number: int):
"""CPI-DIO-0808L の最新の計測値 (Hi/Lo) を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの状態 Hi(True)/Lo(False)
:rtype: bool
:example: state = automeal.read_logic_cpidio0808l("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_logic_cpidio0808l(board_id=board_id, channel_number=channel_number)
[docs]class SamplingLoopIterator(object):
def __init__(self, duration_time_ms: float, sampling_interval_ms: float):
self.sampling_interval = sampling_interval_ms / 1000.0 # [ms] -> [s]
self.duration_time = duration_time_ms / 1000.0 # [ms] -> [s]
self.current_time = time.perf_counter()
self.start_time = self.current_time
self.end_time = self.current_time + self.duration_time
self.frame_count = 0
self.last_force_sampled = False
def __iter__(self):
return self
def __next__(self):
if self.last_force_sampled:
raise StopIteration()
# Avoid overhead and wait.
if self.frame_count > 0:
elapsed_time = time.perf_counter() - self.current_time
wait_time = self.sampling_interval - elapsed_time
if wait_time > 0.0:
time.sleep(wait_time)
else:
pass # delayed
self.frame_count += 1
self.current_time = time.perf_counter()
if self.current_time < self.end_time:
# Interpolation sampling.
if self.frame_count == 1:
return [0.0, self.duration_time]
else:
return [self.current_time - self.start_time, self.duration_time]
else:
# Last sampling.
self.last_force_sampled = True
return [self.duration_time, self.duration_time]
[docs]def smart_clamp(value, range1, range2):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
対象値をrange内に収まるように修正した値を返します。
:param value: 対象値
:param range1: 下限
:param range2: 上限
"""
actual_min = min(range1, range2)
actual_max = max(range1, range2)
return min(max(value, actual_min), actual_max)
[docs]def sampling_loop(duration_time_ms: float, sampling_interval_ms: float = 100):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
指定した時間間隔で for 文を実行するためのイテレータを返します。
:param func: 定期的に呼び出される関数
:param sampling_interval_ms: 呼び出し間隔 [ms] (省略時は 100 ms)
例えば `for [t, d] in automeal.sampling_loop(5000, 100):` とすると、
5000 ms の間、100 ms ごとに for 文が実行されます。
この時 t は、ループ開始からの経過時間 [s] です。
d は duration_time_ms と同じ、呼び出しを続ける時間ですが、単位は [s] となります。
t と d は float 型の値です。
なお最初の for 文の実行時は必ず t=0 であり、最後は t==d となります。
"""
return SamplingLoopIterator(duration_time_ms, sampling_interval_ms)
[docs]def linear_sampling(start: float, last: float, time: float, duration: float):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
start から last へ向かって線形的に変化する値を返します。
:param start: 開始値
:param last: 終了値
:param time: 現在の時間
:param duration: 終了値に遷移するまでの時間
"""
if (time >= duration): return last
value = start + (last - start) * (time / duration)
return int(smart_clamp(value, start, last))
[docs]def step_sampling(start: float, last: float, step: float, time: float, duration: float):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
start から last へ向かって step ごとに変化する値を返します。
:param start: 開始値
:param last: 終了値
:param step: 増分値
:param time: 現在の時間
:param duration: 終了値に遷移するまでの時間
"""
if (time >= duration): return last
if (abs(step) >= abs(last - start)): return last
value = start + (last - start) * (time / duration) // step * step
return int(smart_clamp(value, start, last))
[docs]def sleep(wait: float):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
エラーチェック後に wait 時間待機します。
:param wait: 待機時間(s)
"""
if (wait < 0):
print("テストステップ処理時間 が Time値 超えました。", flush=True)
else:
time.sleep(wait)
[docs]def send_can(user_defined_name, channel, can_id, can_data):
"""指定した識別名のデバイスの指定のチャンネルからCANメッセージを送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID部
:type can_id: int
:param can_data: 送信メッセージのデータ部
:type can_data: bytes
:example: automeal.send_can("1.Vector_VN1630A", 1, 0x100, [0x01, 0x02, 0x03] )
"""
return runner.send_can(user_defined_name, channel, can_id, bytes(can_data))
[docs]def send_can_message(user_defined_name, channel, can_message):
"""指定した識別名のデバイスの指定のチャンネルからメッセージ名と一致するメッセージを送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.send_can_message("1.Vector_VN1630A", 1, "Init") # 定義済みの Init コマンドを送信
"""
return runner.send_can_message(user_defined_name, channel, can_message)
[docs]def get_latest_can(user_defined_name, channel, can_id, direction):
"""指定した識別名のデバイスの指定のチャンネルでCANの受信または送信した最新のデータ1つを取得します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_can("1.Vector_VN1630A", 1, 0x100, "R") # チャンネル1 で最後に受信したデータ取得
"""
return runner.get_latest_can(user_defined_name, channel, can_id, direction)
[docs]def wait_for_next_can_message(user_defined_name, channel, can_id, timeout_ms = None):
"""指定したチャンネルの指定したIDで次にメッセージを受信するまで待機し、受信したデータを取得します。
すでに待機状態のチャンネルを指定した場合は例外になります。
待機中にテストを終了した等チャンネルが切断された場合は例外になります。
指定したタイムアウト秒数内でデータを受信しなかった場合は例外になります。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル
:type channel: int
:param can_id: メッセージのID部分
:type can_id: int
:param timeout_ms: タイムアウトまでの時間(ms)指定。1以上の整数で指定して下さい。指定なし/Noneの場合無限待ちを行います
:type direction: int
:return: 指定したチャンネルで受信したメッセージのデータ部分
:rtype: bytes
:raises ValueError: timeout_msに0または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_can_message("1.Vector_VN1630A", 1, 0x300, 1000) # ID0x300 を次に受信するまで待機。1秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_can_message(user_defined_name, channel, can_id, timeout_ms)
[docs]def start_can_periodic(user_defined_name, channel, can_id, can_data, period):
"""指定した識別名のデバイスの指定のチャンネルからCANメッセージを定期送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID部
:type can_id: int
:param can_data: 送信メッセージのデータ部
:type can_data: bytes
:param period: 定期送信の間隔(ms)
:type period: int
:example: automeal.start_can_periodic("Vector_VN1630A-1", 1, 0x100, [0x01, 0x02, 0x03] , 10)
"""
return runner.start_can_periodic(user_defined_name, channel, can_id, bytes(can_data), period)
[docs]def stop_can_periodic(user_defined_name, channel, can_id = None):
"""指定した識別名のデバイスの指定のチャンネルからの指定のCANメッセージの定期送信を停止します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID。指定なし/Noneの場合は指定チャンネルの定期送信をすべて停止します。
:type can_id: int
:example: automeal.stop_can_periodic("Vector_VN1630A-1", 1, 0x100)
"""
return runner.stop_can_periodic(user_defined_name, channel, can_id)
[docs]def start_can_message_periodic(user_defined_name, can_message):
"""指定した識別名のデバイスからメッセージ名と一致するメッセージの定期送信を開始します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.start_can_message_periodic("1.Vector_VN1630A", "Init") # 定義済みの Init コマンドを送信
"""
return runner.start_can_message_periodic(user_defined_name, can_message)
[docs]def stop_can_message_periodic(user_defined_name, can_message):
"""指定した識別名のデバイスからメッセージ名と一致するメッセージの定期送信を停止します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.stop_can_message_periodic("1.Vector_VN1630A", "Init") # 定義済みの Init コマンドを送信
"""
return runner.stop_can_message_periodic(user_defined_name, can_message)