#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import time
import grpc
import sys
import io
from . import test_runner
from .test_runner import (
MG400State, SerialRecord, CANRecord, UDPRecord, TCPClientRecord,
TestRunnerEnvKeys, TestParameterNames,
SerialStream, CANStream, UDPStream, TCPClientStream
)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
port = os.getenv('AM_GRPC_PORT')
if port:
runner = test_runner.TestRunner(port)
else:
runner = test_runner.TestRunner('3000') #デフォルト3000
[docs]
def start_test(folder_name: str) -> None:
"""レポートを保存するフォルダを作成し、テストを開始します。
作成されるフォルダの名前は "TestResult_<folder_name>" の形式となります。
引数 folder_name に指定する文字列はディレクトリパスではありません。
先頭にスラッシュ "/" や末尾に拡張子 ".*" を含む文字列を指定しないでください。
フォルダはプロジェクトルート (プロジェクトファイルのあるフォルダ) の、
Reports/TestRun_yyyymmdd_hhmmss_<8 桁のランダム文字列>/TestCase_<id>_yyyymmdd_hhmmss 以下に作成されます。
この関数はテストが開始されている状態で呼び出さないでください。
:param folder_name: 任意のフォルダ名
:type folder_name: str
:example: automeal.start_test("MyTest") # "TestResult_MyTest" フォルダを作成してテスト開始
"""
return runner.start_test(folder_name)
[docs]
def stop_test() -> None:
"""テストを終了し、各種レポートやログファイルを start_test() 呼び出しにより
作成されたフォルダに保存します。
この関数を呼び出す前に、start_test() を呼び出している必要があります。
:example: automeal.stop_test()
"""
return runner.stop_test()
[docs]
def send_serial(port_id: str, serial_data: str | bytes) -> None:
"""指定したシリアルポートにデータを送信します。
文字列を指定した場合は UTF-8 エンコードされて送信されます。
:param port_id: ポートID
:type port_id: str
:param serial_data: 送信データ 文字列またはバイト配列で指定
:type serial_data: str or bytes
:example: automeal.send_serial("Port1", "Send Init") # "Send Init" を UTF-8 エンコードして送信
:example: automeal.send_serial("Port1", bytes([0x01, 0x02, 0x03]))
"""
if type(serial_data)==str :
return runner.send_serial(port_id,serial_data.encode('utf-8'))
else :
return runner.send_serial(port_id,bytes(serial_data))
[docs]
def send_serial_command(port_id: str, serial_command: str) -> None:
"""指定したシリアルポートに登録済みのコマンド名と一致するデータを送信します。
:param port_id: ポートID
:type port_id: str
:param serial_command: 送信するデータのコマンド名(シリアル送信パネルのID列を指定してください)
:type serial_command: str
:example: automeal.send_serial_command("Port1", "Init") # 定義済みの Init コマンドを送信
"""
return runner.send_serial_command(port_id,serial_command)
[docs]
def get_latest_serial(port_id: str, direction: str) -> bytes:
"""シリアルの受信または送信した最新のデータ1つを取得します。
:param port_id: ポートID
:type port_id: str
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_serial("Port1", "R") # Port1 で最後に受信したデータ取得
"""
return runner.get_latest_serial(port_id,direction)
[docs]
def wait_for_next_serial_data(port_id: str, timeout_ms: int | None = None) -> bytes:
"""指定したポートで次にデータを受信するまで待機し、受信したデータを取得します。
すでに待機状態のポートを指定した場合は例外になります。
待機中にテストを終了した等ポートが切断された場合は例外になります。
指定したタイムアウト時間内にデータを受信しなかった場合は例外になります。
:param port_id: ポートID
:type port_id: str
:param timeout_ms: タイムアウトまでの時間(ms)指定。1以上の整数で指定して下さい。指定なし/Noneの場合無限待ちを行います
:type timeout_ms: int | None
:return: 指定したポートの受信データ
:rtype: bytes
:raises ValueError: timeout_msに0または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_serial_data("Port1", 5000) # Port1 で次に受信するまで待機。5秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_serial_data(port_id, timeout_ms)
[docs]
def write_logic_rpigp10(board_id: str, channel_number: int, state: bool) -> None:
"""RPi-GP10 の指定した1chの出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param state: チャンネルの状態 Hi(True)/Lo(False)
:type state: bool
:example: automeal.write_logic_rpigp10("Board1", 1, True) # Board1 の ch1 を Hi に
"""
return runner.write_logic_rpigp10(board_id=board_id, channel_number=channel_number, state=state)
[docs]
def write_logic_rpigp10_multi(board_id: str, states: list) -> None:
"""RPi-GP10 の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 8ch 分の出力状態値 Hi(True)/Lo(False)/更新なし(None)
:type states: list[bool | None]
:example: automeal.write_logic_rpigp10_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を Hi, ch5,ch6 を Lo, それ以外は現状維持
8ch未満の場合、未指定分は None として扱われます。
8chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_rpigp10_multi(board_id=board_id, states=states)
[docs]
def write_logic_rpigp10_bits(board_id: str, state: int) -> None:
"""RPi-GP10 全chの出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 8ch 分の出力状態値 (0x00-0xFF)(Hi=1/Lo=0)
:type state: int
:example: automeal.write_logic_rpigp10_bits("Board1", 0xFF) # Board1 8ch 全てを Hi に
"""
return runner.write_logic_rpigp10_bits(board_id=board_id, state=state)
[docs]
def read_logic_rpigp10(board_id: str, channel_number: int) -> bool:
"""RPi-GP10 の最新の計測値 (Hi/Lo) を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの状態 Hi(True)/Lo(False)
:rtype: bool
:example: state = automeal.read_logic_rpigp10("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_logic_rpigp10(board_id=board_id, channel_number=channel_number)
[docs]
def write_analog_amao(board_id: str, channel_number: int, voltage_mv: int) -> None:
"""AM-AO の出力電圧値を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param voltage_mv: 電圧値 [mV] (0-基準電圧)。 上限は TestUnit 設定で設定している基準電圧によって変化します。
:type voltage_mv: int
:example: automeal.write_analog_amao("Board1", 1, 5000) # Board1 の ch1 を 5000 mV に
"""
return runner.write_analog_amao(board_id=board_id, channel_number=channel_number, voltage_mv=voltage_mv)
[docs]
def read_analog_rpigp40(board_id: str, channel_number: int) -> int:
"""RPi-GP40 の最新の計測電圧値 [mV] を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの電圧値 [mV]
:rtype: int
:example: voltage = automeal.read_analog_rpigp40("Board1", 1) # Board1 の ch1 の電圧値取得
"""
return runner.read_analog_rpigp40(board_id=board_id, channel_number=channel_number)
[docs]
def write_pwm_ampio(board_id: str, channel_number: int, frequency_hz: int, duty_percent: float) -> None:
"""AM-PIO の出力 PWM 信号の周波数とデューティー比を更新します。
`duty_percent` はプロジェクトの設定によって分解能以下は切り捨てられます。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-4)
:type channel_number: int
:param frequency_hz: 周波数 [Hz] (0-1,000,000)。 上限は TestUnit 設定で設定している最大周波数によって変化します。
:type frequency_hz: int
:param duty_percent: デューティー比 [%] (0.0-99.9)。 TestUnit 設定で設定している最大周波数 (Duty 分解能) によって値が切り捨てられます。
:type duty_percent: float
:example: automeal.write_pwm_ampio("Board1", 1, 10000, 50) # Board1 の ch1 を 10,000 Hz, 50 % に
"""
return runner.write_pwm_ampio(board_id=board_id, channel_number=channel_number, frequency_hz=frequency_hz, duty_percent=duty_percent)
[docs]
def read_pwm_ampio(board_id: str, channel_number: int) -> tuple[int, float]:
"""AM-PIO の最新の計測 PWM 信号の周波数 [Hz] とデューティー比 [%] を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-16)
:type channel_number: int
:return: 指定したチャンネルの周波数 [Hz] とデューティー比 [%]
:rtype: (int, float)
:example: freq, duty = automeal.read_pwm_ampio("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_pwm_ampio(board_id=board_id, channel_number=channel_number)
[docs]
def read_analog_cpiai1208li(board_id: str, channel_number: int) -> int:
"""CPI-AI-1208LI の最新の計測電圧値 [mV] を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの電圧値 [mV]
:rtype: int
:example: voltage = automeal.read_analog_cpiai1208li("Board1", 1) # Board1 の ch1 の電圧値取得
"""
return runner.read_analog_cpiai1208li(board_id=board_id, channel_number=channel_number)
[docs]
def write_logic_cpirry16(board_id: str, channel_number: int, state: bool) -> None:
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-16)
:type channel_number: int
:param state: チャンネルの状態 ON(True)/OFF(False)
:type state: bool
:example: automeal.write_logic_cpirry16("Board1", 1, True) # Board1 の ch1 を ON に
"""
return runner.write_logic_cpirry16(board_id=board_id, channel_number=channel_number, state=state)
[docs]
def write_logic_cpirry16_multi(board_id: str, states: list) -> None:
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 16ch 分の出力状態値 ON(True)/OFF(False)/更新なし(None)
:type states: list[bool | None]
:example: automeal.write_logic_cpirry16_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を ON, ch5,ch6 を OFF, それ以外は現状維持
16ch未満の場合、未指定分は None として扱われます。
16chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_cpirry16_multi(board_id=board_id, states=states)
[docs]
def write_logic_cpirry16_bits(board_id: str, state: int) -> None:
"""CPI-RRY-16 の出力状態値 (ON/OFF) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 16ch 分の出力状態値 (0x0000-0xFFFF)(ON=1/OFF=0)
:type state: int
:example: automeal.write_logic_cpirry16_bits("Board1", 0xFFFF) # Board1 16ch 全てを ON に
"""
return runner.write_logic_cpirry16_bits(board_id=board_id, state=state)
[docs]
def write_logic_cpidio0808l(board_id: str, channel_number: int, state: bool) -> None:
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:param state: チャンネルの状態 Hi(True)/Lo(False)
:type state: bool
:example: automeal.write_logic_cpidio0808l("Board1", 1, True) # Board1 の ch1 を Hi に
"""
return runner.write_logic_cpidio0808l(board_id=board_id, channel_number=channel_number, state=state)
[docs]
def write_logic_cpidio0808l_multi(board_id: str, states: list) -> None:
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param states: 8ch 分の出力状態値 Hi(True)/Lo(False)/更新なし(None)
:type states: list[bool | None]
:example: automeal.write_logic_cpidio0808l_multi("Board1", [True, True, None, None, False, False]) # Board1 の ch1,ch2 を Hi, ch5,ch6 を Lo, それ以外は現状維持
8ch未満の場合、未指定分は None として扱われます。
8chを超える指定の場合、超えた分は無効になります。
"""
return runner.write_logic_cpidio0808l_multi(board_id=board_id, states=states)
[docs]
def write_logic_cpidio0808l_bits(board_id: str, state: int) -> None:
"""CPI-DIO-0808L の出力状態値 (Hi/Lo) を更新します。
:param board_id: ボード ID
:type board_id: str
:param state: 8ch 分の出力状態値 (0x00-0xFF)(Hi=1/Lo=0)
:type state: int
:example: automeal.write_logic_cpidio0808l_bits("Board1", 0xFF) # Board1 8ch 全てを Hi に
"""
return runner.write_logic_cpidio0808l_bits(board_id=board_id, state=state)
[docs]
def read_logic_cpidio0808l(board_id: str, channel_number: int) -> bool:
"""CPI-DIO-0808L の最新の計測値 (Hi/Lo) を取得します。
:param board_id: ボード ID
:type board_id: str
:param channel_number: チャンネル番号 (1-8)
:type channel_number: int
:return: 指定したチャンネルの状態 Hi(True)/Lo(False)
:rtype: bool
:example: state = automeal.read_logic_cpidio0808l("Board1", 1) # Board1 の ch1 の状態取得
"""
return runner.read_logic_cpidio0808l(board_id=board_id, channel_number=channel_number)
[docs]
class SamplingLoopIterator(object):
def __init__(self, duration_time_ms: float, sampling_interval_ms: float):
self.sampling_interval = sampling_interval_ms / 1000.0 # [ms] -> [s]
self.duration_time = duration_time_ms / 1000.0 # [ms] -> [s]
self.current_time = time.perf_counter()
self.start_time = self.current_time
self.end_time = self.current_time + self.duration_time
self.frame_count = 0
self.last_force_sampled = False
def __iter__(self):
return self
def __next__(self):
if self.last_force_sampled:
raise StopIteration()
# Avoid overhead and wait.
if self.frame_count > 0:
elapsed_time = time.perf_counter() - self.current_time
wait_time = self.sampling_interval - elapsed_time
if wait_time > 0.0:
time.sleep(wait_time)
else:
pass # delayed
self.frame_count += 1
self.current_time = time.perf_counter()
if self.current_time < self.end_time:
# Interpolation sampling.
if self.frame_count == 1:
return [0.0, self.duration_time]
else:
return [self.current_time - self.start_time, self.duration_time]
else:
# Last sampling.
self.last_force_sampled = True
return [self.duration_time, self.duration_time]
[docs]
def smart_clamp(value, range1, range2):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
対象値をrange内に収まるように修正した値を返します。
:param value: 対象値
:param range1: 下限
:param range2: 上限
"""
actual_min = min(range1, range2)
actual_max = max(range1, range2)
return min(max(value, actual_min), actual_max)
[docs]
def sampling_loop(duration_time_ms: float, sampling_interval_ms: float = 100) -> SamplingLoopIterator:
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
指定した時間間隔で for 文を実行するためのイテレータを返します。
:example:
例えば `for [t, d] in automeal.sampling_loop(5000, 100):` とすると、
5000 ms の間、100 ms ごとに for 文が実行されます。
この時 t は、ループ開始からの経過時間 [s] です。
d は duration_time_ms と同じ、呼び出しを続ける時間ですが、単位は [s] となります。
t と d は float 型の値です。
なお最初の for 文の実行時は必ず t=0 であり、最後は t==d となります。
:param duration_time_ms: 呼び出しを続ける時間 [ms]
:type duration_time_ms: float
:param sampling_interval_ms: 呼び出し間隔 [ms] (省略時は 100 ms)
:type sampling_interval_ms: float
:return: 指定した時間間隔で for 文を実行するためのイテレータ
:rtype: SamplingLoopIterator
"""
return SamplingLoopIterator(duration_time_ms, sampling_interval_ms)
[docs]
def linear_sampling(start: float, last: float, time: float, duration: float) -> int:
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
start から last へ向かって線形的に変化する値を返します。
:param start: 開始値
:type start: float
:param last: 終了値
:type last: float
:param time: 現在の時間
:type time: float
:param duration: 終了値に遷移するまでの時間
:type duration: float
:return: 線形的に変化する値
:rtype: int
"""
if (time >= duration): return last
value = start + (last - start) * (time / duration)
return int(smart_clamp(value, start, last))
[docs]
def step_sampling(start: float, last: float, step: float, time: float, duration: float) -> int:
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
start から last へ向かって step ごとに変化する値を返します。
:param start: 開始値
:type start: float
:param last: 終了値
:type last: float
:param step: 増分値
:type step: float
:param time: 現在の時間
:type time: float
:param duration: 終了値に遷移するまでの時間
:type duration: float
:return: step ごとに変化する値
:rtype: int
"""
if (time >= duration): return last
if (abs(step) >= abs(last - start)): return last
value = start + (last - start) * (time / duration) // step * step
return int(smart_clamp(value, start, last))
[docs]
def sleep(wait: float):
"""
ScriptGenerator の自動生成コードで使用するヘルパー関数です。
エラーチェック後に wait 時間待機します。
:param wait: 待機時間(s)
:type wait: float
"""
if (wait < 0):
print("テストステップ処理時間 が Time値 を超えました。", flush=True)
else:
time.sleep(wait)
[docs]
def send_can(user_defined_name: str, channel: int, can_id: int, can_data: bytes) -> None:
"""指定した識別名のデバイスの指定のチャンネルからCANメッセージを送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID部
:type can_id: int
:param can_data: 送信メッセージのデータ部
:type can_data: bytes
:example: automeal.send_can("Vector-VN1630A-1", 1, 0x100, bytes([0x01, 0x02, 0x03]))
"""
return runner.send_can(user_defined_name, channel, can_id, bytes(can_data))
[docs]
def send_can_message(user_defined_name: str, channel: int, can_message: str) -> None:
"""指定した識別名のデバイスの指定のチャンネルからメッセージ名と一致するメッセージを送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.send_can_message("Vector-VN1630A-1", 1, "Init") # 定義済みの Init コマンドを送信
"""
return runner.send_can_message(user_defined_name, channel, can_message)
[docs]
def get_latest_can(user_defined_name: str, channel: int, can_id: int, direction: str) -> bytes:
"""指定した識別名のデバイスの指定のチャンネルでCANの受信または送信した最新のデータ1つを取得します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_can("Vector-VN1630A-1", 1, 0x100, "R") # チャンネル1 で最後に受信したデータ取得
"""
return runner.get_latest_can(user_defined_name, channel, can_id, direction)
[docs]
def wait_for_next_can_message(user_defined_name: str, channel: int, can_id: int, timeout_ms: int | None = None) -> bytes:
"""指定したチャンネルの指定したIDで次にメッセージを受信するまで待機し、受信したデータを取得します。
すでに待機状態のチャンネルを指定した場合は例外になります。
待機中にテストを終了した等チャンネルが切断された場合は例外になります。
指定したタイムアウト時間内にデータを受信しなかった場合は例外になります。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル
:type channel: int
:param can_id: メッセージのID部分
:type can_id: int
:param timeout_ms: タイムアウトまでの時間(ms)指定。1以上の整数で指定して下さい。指定なし/Noneの場合無限待ちを行います
:type timeout_ms: int | None
:return: 指定したチャンネルで受信したメッセージのデータ部分
:rtype: bytes
:raises ValueError: timeout_msに0または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_can_message("Vector-VN1630A-1", 1, 0x300, 1000) # ID0x300 を次に受信するまで待機。1秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_can_message(user_defined_name, channel, can_id, timeout_ms)
[docs]
def start_can_periodic(user_defined_name: str, channel: int, can_id: int, can_data: bytes, period: int) -> None:
"""指定した識別名のデバイスの指定のチャンネルからCANメッセージを定期送信します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID部
:type can_id: int
:param can_data: 送信メッセージのデータ部
:type can_data: bytes
:param period: 定期送信の間隔(ms)
:type period: int
:example: automeal.start_can_periodic("Vector-VN1630A-1", 1, 0x100, bytes([0x01, 0x02, 0x03]), 10)
"""
return runner.start_can_periodic(user_defined_name, channel, can_id, bytes(can_data), period)
[docs]
def stop_can_periodic(user_defined_name: str, channel: int, can_id: int | None = None) -> None:
"""指定した識別名のデバイスの指定のチャンネルからの指定のCANメッセージの定期送信を停止します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param channel: チャンネル番号
:type channel: int
:param can_id: CANメッセージのID。指定なし/Noneの場合は指定チャンネルの定期送信をすべて停止します。
:type can_id: int | None
:example: automeal.stop_can_periodic("Vector-VN1630A-1", 1, 0x100)
"""
return runner.stop_can_periodic(user_defined_name, channel, can_id)
[docs]
def start_can_message_periodic(user_defined_name: str, can_message: str) -> None:
"""指定した識別名のデバイスからメッセージ名と一致するメッセージの定期送信を開始します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.start_can_message_periodic("Vector-VN1630A-1", "Init") # 定義済みの Init メッセージの定期送信を開始
"""
return runner.start_can_message_periodic(user_defined_name, can_message)
[docs]
def stop_can_message_periodic(user_defined_name: str, can_message: str) -> None:
"""指定した識別名のデバイスからメッセージ名と一致するメッセージの定期送信を停止します。
:param user_defined_name: 識別名
:type user_defined_name: str
:param can_message: 送信するデータのメッセージ名(CAN送信パネルのMessage列)
:type can_message: str
:example: automeal.stop_can_message_periodic("Vector-VN1630A-1", "Init") # 定義済みの Init メッセージの定期送信を停止
"""
return runner.stop_can_message_periodic(user_defined_name, can_message)
[docs]
def send_udp(endpoint_name: str, remote_ip: str, remote_port: int, udp_data: bytes) -> None:
"""指定した UDP エンドポイントからデータを送信します。
UDP 通信設定で設定したエンコーディングで送信されます。
:param endpoint_name: エンドポイント名 (UDP 通信設定で設定した名前)
:type endpoint_name: str
:param remote_ip: リモート IP アドレス
:type remote_ip: str
:param remote_port: リモートポート番号
:type remote_port: int
:param udp_data: 送信データ
:type udp_data: bytes
:example: automeal.send_udp("Endpoint1", "192.168.10.11", 8080, bytes([0x01, 0x02, 0x03]))
"""
return runner.send_udp(endpoint_name, remote_ip, remote_port, bytes(udp_data))
[docs]
def get_latest_udp(endpoint_name: str, direction: str) -> bytes:
"""指定した UDP エンドポイントで送信または受信した最新のデータ 1 つを取得します。
:param endpoint_name: エンドポイント名 (UDP 通信設定で設定した名前)
:type endpoint_name: str
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_udp("Endpoint1", "R") # Endpoint1 で最後に受信したデータ取得
"""
return runner.get_latest_udp(endpoint_name, direction)
[docs]
def wait_for_next_udp_data(endpoint_name: str, timeout_ms: int | None = None) -> bytes:
"""指定した UDP エンドポイントで次にデータを受信するまで待機し、受信したデータを取得します。
すでに待機状態のエンドポイントを指定した場合は例外になります。
待機中にテストを終了した等接続が切断された場合は例外になります。
指定したタイムアウト時間内にデータを受信しなかった場合は例外になります。
:param endpoint_name: エンドポイント名 (UDP 通信設定で設定した名前)
:type endpoint_name: str
:param timeout_ms: タイムアウトまでの時間 (ms) 指定。1 以上の整数で指定して下さい。指定なし/None の場合無限待ちを行います
:type timeout_ms: int | None
:return: 指定したエンドポイントの受信データ
:rtype: bytes
:raises ValueError: timeout_ms に 0 または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_udp_data("Endpoint1", 5000) # Endpoint1 で次に受信するまで待機。5秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_udp_data(endpoint_name, timeout_ms)
[docs]
def start_mg400_project(mg400_name: str, project_name: str) -> MG400ProjectThread:
"""指定したMG400ロボットでプロジェクトをスレッドで開始します。
:param mg400_name: MG400ロボットの名前
:type mg400_name: str
:param project_name: 開始するプロジェクト名
:type project_name: str
:return: 実行完了を待機するためのスレッドオブジェクト
:rtype: MG400ProjectThread
:example:
| thread = automeal.start_mg400_project("MG400_1", "PickAndPlaceProject")
| thread.join(20000) # 最大20秒待機
"""
return runner.start_mg400_project(mg400_name, project_name)
[docs]
def stop_mg400_project(mg400_name: str) -> None:
"""指定したMG400ロボットで実行中のプロジェクトを停止します。
:param mg400_name: MG400ロボットの名前
:type mg400_name: str
:example: automeal.stop_mg400_project("MG400_1")
"""
return runner.stop_mg400_project(mg400_name)
[docs]
def get_mg400_state(mg400_name: str) -> MG400State:
"""指定したMG400ロボットのステータスを取得します。
:param mg400_name: MG400ロボットの名前
:type mg400_name: str
:return: MG400ロボットのステータス情報
:rtype: MG400State
:example: status = automeal.get_mg400_state("MG400_1")
"""
return runner.get_mg400_state(mg400_name)
[docs]
def start_vd_judge(config_file_path: str, input_media_path: str = None, output_path: str = None)-> VDThread:
"""Connected VD® の判定を開始します。
:param config_file_path: 設定ファイルのパス(相対パスの場合はプロジェクトルート基準)
:type config_file_path: str
:param input_media_path: 入力メディアのパス(省略時は設定ファイルに従う)
:type input_media_path: str | None
:param output_path: 出力のパス(省略時は設定ファイルに従う)
:type output_path: str | None
:return: 判定完了を待機するためのスレッドオブジェクト
:rtype: VDThread
:example:
| automeal.start_vd_judge(
| config_file_path="configs/trace_config.ini",
| input_media_path="C:/videos/test.mp4",
| output_path="C:/results"
| )
"""
runner.load_vd_config(config_file_path)
# 変更する設定を更新
kwargs = {}
if input_media_path is not None:
kwargs['input_media_path'] = input_media_path
if output_path is not None:
kwargs['output_path'] = output_path
# 変更する設定がある場合は更新
if kwargs:
runner.update_vd_config(**kwargs)
return runner.start_vd_judge()
[docs]
def stop_vd_judge() -> None:
"""Connected VD® の判定を停止します。
:example: automeal.stop_vd_judge()
"""
return runner.stop_vd_judge()
[docs]
def get_vd_log_list() -> list[dict]:
"""Connected VD® の判定ログを取得します。
:return:
| 以下のキーを持つ辞書(dict)のリスト
| - label_name: ラベル名 (str)
| - detect_time: 検知時間 (str)
| - detect_data: 検知データ(str)
| - detect_pattern: 検知パターン (str)
| - detect_info: 検知内容 (str)
| - error_level: エラーレベル (int)
:rtype: list[dict]
"""
return runner.get_vd_log_list()
[docs]
def open_serial_stream() -> Iterator[SerialRecord]:
"""シリアルストリームを開きます。
:return: シリアルデータのストリーミングイテレータ
:rtype: Iterator[SerialRecord]
:example:
| serial_stream = automeal.open_serial_stream()
| for data in serial_stream:
| print(data)
| break # 1データ受信したら終了
| automeal.close_serial_stream(serial_stream)
"""
return runner.open_serial_stream()
[docs]
def close_serial_stream(stream: Iterator[SerialRecord]) -> None:
"""シリアルストリームを閉じます。
:param stream: open_serial_streamで取得したストリーミングイテレータ
:type stream: Iterator[SerialRecord]
"""
return runner.close_serial_stream(stream)
[docs]
def create_serial_stream() -> SerialStream:
"""シリアルストリームオブジェクトを作成します。
with 文で使用することで、ブロック終了時に自動でストリームを閉じることができます。
:return: シリアルストリームオブジェクト
:rtype: SerialStream
:example:
| with automeal.create_serial_stream() as serial_stream:
| for data in serial_stream:
| print(data)
| break # 1データ受信したら終了
"""
return runner.create_serial_stream()
[docs]
def open_can_stream()-> Iterator[CANRecord]:
"""CANストリームの受信を開始します。
:return: CANデータのストリーミングイテレータ
:rtype: Iterator[CANRecord]
:example:
| can_stream = automeal.open_can_stream()
| for data in can_stream:
| print(data)
| break # 1データ受信したら終了
| automeal.close_can_stream(can_stream)
"""
return runner.open_can_stream()
[docs]
def close_can_stream(stream: Iterator[CANRecord]) -> None:
"""CANストリームの受信を終了します。
:param stream: open_can_streamで取得したストリーミングイテレータ
:type stream: Iterator[CANRecord]
"""
return runner.close_can_stream(stream)
[docs]
def create_can_stream() -> CANStream:
"""CANストリームオブジェクトを作成します。
with 文で使用することで、ブロック終了時に自動でストリームを閉じることができます。
:return: CANストリームオブジェクト
:rtype: CANStream
:example:
| with automeal.create_can_stream() as can_stream:
| for data in can_stream:
| print(data)
| break # 1データ受信したら終了
"""
return runner.create_can_stream()
[docs]
def open_udp_stream()-> Iterator[UDPRecord]:
"""UDP ストリームの受信を開始します。
:return: UDP データのストリーミングイテレータ
:rtype: Iterator[UDPRecord]
:example:
| udp_stream = automeal.open_udp_stream()
| for data in udp_stream:
| print(data)
| break # 1 データ受信したら終了
| automeal.close_udp_stream(udp_stream)
"""
return runner.open_udp_stream()
[docs]
def close_udp_stream(stream: Iterator[UDPRecord]) -> None:
"""UDP ストリームの受信を終了します。
:param stream: open_udp_stream で取得したストリーミングイテレータ
:type stream: Iterator[UDPRecord]
"""
return runner.close_udp_stream(stream)
[docs]
def create_udp_stream() -> UDPStream:
"""UDP ストリームオブジェクトを作成します。
with 文で使用することで、ブロック終了時に自動でストリームを閉じることができます。
:return: UDP ストリームオブジェクト
:rtype: UDPStream
:example:
| with automeal.create_udp_stream() as udp_stream:
| for data in udp_stream:
| print(data)
| break # 1データ受信したら終了
"""
return runner.create_udp_stream()
[docs]
def get_latest_tcp_client(client_name: str, direction: str) -> bytes:
"""指定した TCP クライアントで送信または受信した最新のデータ 1 つを取得します。
:param client_name: クライアント名
:type client_name: str
:param direction: 送受信方向 送信("S")、 受信("R") で指定
:type direction: str
:return: 指定した送受信方向の最新データ
:rtype: bytes
:example: last_recv = automeal.get_latest_tcp_client("Client1", "R") # Client1 で最後に受信したデータ取得
"""
return runner.get_latest_tcp_client(client_name, direction)
[docs]
def wait_for_next_tcp_client_data(client_name: str, timeout_ms: int | None = None) -> bytes:
"""指定した TCP クライアントで次にデータを受信するまで待機し、受信したデータを取得します。
すでに待機状態のクライアントを指定した場合は例外になります。
待機中にテストを終了した等接続が切断された場合は例外になります。
指定したタイムアウト時間内にデータを受信しなかった場合は例外になります。
:param client_name: クライアント名
:type client_name: str
:param timeout_ms: タイムアウトまでの時間 (ms) 指定。1 以上の整数で指定して下さい。指定なし/None の場合無限待ちを行います
:type timeout_ms: int | None
:return: 指定したクライアントの受信データ
:rtype: bytes
:raises ValueError: timeout_ms に 0 または負の値が指定された場合
:example: last_recv = automeal.wait_for_next_tcp_client_data("Client1", 5000) # Client1 で次に受信するまで待機。5 秒受信なしの場合例外エラー。
"""
return runner.wait_for_next_tcp_client_data(client_name, timeout_ms)
[docs]
def open_tcp_client_stream()-> Iterator[TCPClientRecord]:
"""TCP クライアントストリームの受信を開始します。
:return: TCP クライアントデータのストリーミングイテレータ
:rtype: Iterator[TCPClientRecord]
:example:
| tcp_client_stream = automeal.open_tcp_client_stream()
| for data in tcp_client_stream:
| print(data)
| break # 1 データ受信したら終了
| automeal.close_tcp_client_stream(tcp_client_stream)
"""
return runner.open_tcp_client_stream()
[docs]
def close_tcp_client_stream(stream: Iterator[TCPClientRecord]) -> None:
"""TCP クライアントストリームの受信を終了します。
:param stream: open_tcp_client_stream で取得したストリーミングイテレータ
:type stream: Iterator[TCPClientRecord]
"""
return runner.close_tcp_client_stream(stream)
[docs]
def create_tcp_client_stream() -> TCPClientStream:
"""TCP クライアントストリームオブジェクトを作成します。
with 文で使用することで、ブロック終了時に自動でストリームを閉じることができます。
:return: TCP クライアントストリームオブジェクト
:rtype: TCPClientStream
:example:
| with automeal.create_tcp_client_stream() as tcp_client_stream:
| for data in tcp_client_stream:
| print(data)
| break # 1 データ受信したら終了
"""
return runner.create_tcp_client_stream()
[docs]
def connect_tcp_client(client_name: str) -> None:
"""指定した TCP クライアント接続を確立します。
:param client_name: クライアント名 (TCP Client 通信設定で設定した名前)
:type client_name: str
:example: automeal.connect_tcp_client("Client1")
"""
runner.connect_tcp_client(client_name)
[docs]
def disconnect_tcp_client(client_name: str) -> None:
"""指定した TCP クライアント接続を切断します。
:param client_name: クライアント名 (TCP Client 通信設定で設定した名前)
:type client_name: str
:example: automeal.disconnect_tcp_client("Client1")
"""
runner.disconnect_tcp_client(client_name)
[docs]
def is_connected_tcp_client(client_name: str) -> bool:
"""指定した TCP クライアント接続が確立されているか確認します。
:param client_name: クライアント名 (TCP Client 通信設定で設定した名前)
:type client_name: str
:return: 接続されている場合 True、切断されている場合 False
:rtype: bool
:example: state = automeal.is_connected_tcp_client("Client1")
"""
return runner.is_connected_tcp_client(client_name)
[docs]
def send_tcp_client(client_name: str, tcp_data: bytes) -> None:
"""指定した TCP クライアント接続からデータを送信します。
:param client_name: クライアント名 (TCP Client 通信設定で設定した名前)
:type client_name: str
:param tcp_data: 送信データ
:type tcp_data: bytes
:example: automeal.send_tcp_client("Client1", b"data")
"""
runner.send_tcp_client(client_name, bytes(tcp_data))
[docs]
def get_test_parameters(key: str) -> str | None:
"""テスト実行に使用されているテストパラメータの値を取得します。
:param key: 取得するテストパラメータのキー
:type key: str
:return: 指定したキーに対応するテストパラメータの値。キーが存在しない場合は None を返します。
:rtype: str | None
:example: report_dir = automeal.get_test_parameters("report_save_directory")
:Note: 取得可能なキーの一覧は以下の通りです。
- report_save_directory: レポート保存ディレクトリのパス
"""
if key == TestParameterNames.REPORT_SAVE_DIRECTORY.value:
return runner._get_env_value(TestRunnerEnvKeys.REPORT_SAVE_DIR.value)
else:
return None