diff --git a/controller/packagesrc/__init__.py b/controller/packagesrc/__init__.py new file mode 100644 index 0000000..423fd44 --- /dev/null +++ b/controller/packagesrc/__init__.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗ +# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ +# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝ +# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝ +# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║ +# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + +""" +RoboMasterPy: Python library and framework for RoboMaster EP +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + + +:copyright: (c) 2020 by LI Zhennan. +:license: MIT, see LICENSE for more details. +""" + +from . import framework +from . import measure +from .__version__ import ( + __title__, __description__, __url__, __version__, + __author__, __author_email__, __license__, __copyright__, +) +from .client import CTX, LOG_LEVEL +from .client import ( + ChassisSpeed, ChassisPosition, ChassisAttitude, ChassisStatus, + GimbalAttitude, + ArmorHitEvent, SoundApplauseEvent, +) +from .client import ( + VIDEO_PORT, AUDIO_PORT, CTRL_PORT, PUSH_PORT, EVENT_PORT, IP_PORT, + DEFAULT_BUF_SIZE, + SWITCH_ON, SWITCH_OFF, + MODE_CHASSIS_LEAD, MODE_GIMBAL_LEAD, MODE_FREE, + ARMOR_HIT, + SOUND_APPLAUSE, + LED_ALL, LED_TOP_ALL, LED_TOP_RIGHT, LED_TOP_LEFT, LED_BOTTOM_ALL, LED_BOTTOM_FRONT, LED_BOTTOM_BACK, LED_BOTTOM_LEFT, LED_BOTTOM_RIGHT, + LED_EFFECT_SOLID, LED_EFFECT_OFF, LED_EFFECT_PULSE, LED_EFFECT_BLINK, LED_EFFECT_SCROLLING, +) +from .client import get_broadcast_ip, Commander diff --git a/controller/packagesrc/__version__.py b/controller/packagesrc/__version__.py new file mode 100644 index 0000000..57b2088 --- /dev/null +++ b/controller/packagesrc/__version__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + + +# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗ +# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ +# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝ +# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝ +# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║ +# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + + +__title__ = 'robomasterpy' +__description__ = 'RoboMaster Python SDK and framework.' +__url__ = 'https://github.com/nanmu42/robomasterpy' +__version__ = '0.1.1' +__author__ = 'LI Zhennan' +__author_email__ = 'i@nanmu.me' +__license__ = 'MIT' +__copyright__ = '2020 LI Zhennan' diff --git a/controller/packagesrc/client.py b/controller/packagesrc/client.py new file mode 100644 index 0000000..6799629 --- /dev/null +++ b/controller/packagesrc/client.py @@ -0,0 +1,775 @@ +# -*- coding: utf-8 -*- + +# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗ +# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ +# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝ +# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝ +# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║ +# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + +import logging +import multiprocessing as mp +import socket +from typing import Optional + +from dataclasses import dataclass + +CTX = mp.get_context('spawn') +LOG_LEVEL = logging.DEBUG + +VIDEO_PORT: int = 40921 +AUDIO_PORT: int = 40922 +CTRL_PORT: int = 40923 +PUSH_PORT: int = 40924 +EVENT_PORT: int = 40925 +IP_PORT: int = 40926 + +DEFAULT_BUF_SIZE: int = 512 + +# switch_enum +SWITCH_ON: str = 'on' +SWITCH_OFF: str = 'off' + +# mode_enum +MODE_CHASSIS_LEAD: str = 'chassis_lead' +MODE_GIMBAL_LEAD: str = 'gimbal_lead' +MODE_FREE: str = 'free' +MODE_ENUMS = (MODE_CHASSIS_LEAD, MODE_GIMBAL_LEAD, MODE_FREE) + +# armor_event_attr_enum +ARMOR_HIT: str = 'hit' +ARMOR_ENUMS = (ARMOR_HIT,) + +# sound_event_attr_enum +SOUND_APPLAUSE: str = 'applause' +SOUND_ENUMS = (SOUND_APPLAUSE,) + +# led_comp_enum +LED_ALL = 'all' +LED_TOP_ALL = 'top_all' +LED_TOP_RIGHT = 'top_right' +LED_TOP_LEFT = 'top_left' +LED_BOTTOM_ALL = 'bottom_all' +LED_BOTTOM_FRONT = 'bottom_front' +LED_BOTTOM_BACK = 'bottom_back' +LED_BOTTOM_LEFT = 'bottom_left' +LED_BOTTOM_RIGHT = 'bottom_right' +LED_ENUMS = (LED_ALL, LED_TOP_ALL, LED_TOP_RIGHT, LED_TOP_LEFT, + LED_BOTTOM_ALL, LED_BOTTOM_FRONT, LED_BOTTOM_BACK, + LED_BOTTOM_LEFT, LED_BOTTOM_RIGHT) + +# led_effect_enum +LED_EFFECT_SOLID = 'solid' +LED_EFFECT_OFF = 'off' +LED_EFFECT_PULSE = 'pulse' +LED_EFFECT_BLINK = 'blink' +LED_EFFECT_SCROLLING = 'scrolling' +LED_EFFECT_ENUMS = (LED_EFFECT_SOLID, LED_EFFECT_OFF, + LED_EFFECT_PULSE, LED_EFFECT_BLINK, + LED_EFFECT_SCROLLING) + + +@dataclass +class ChassisSpeed: + x: float + y: float + z: float + w1: int + w2: int + w3: int + w4: int + + +@dataclass +class ChassisPosition: + x: float + y: float + z: Optional[float] + + +@dataclass +class ChassisAttitude: + pitch: float + roll: float + yaw: float + + +@dataclass +class ChassisStatus: + # 是否静止 + static: bool + # 是否上坡 + uphill: bool + # 是否下坡 + downhill: bool + # 是否溜坡 + on_slope: bool + # 是否被拿起 + pick_up: bool + # 是否滑行 + slip: bool + # x轴是否感应到撞击 + impact_x: bool + # y轴是否感应到撞击 + impact_y: bool + # z轴是否感应到撞击 + impact_z: bool + # 是否翻车 + roll_over: bool + # 是否在坡上静止 + hill_static: bool + + +@dataclass +class GimbalAttitude: + pitch: float + yaw: float + + +@dataclass +class ArmorHitEvent: + index: int + type: int + + +@dataclass +class SoundApplauseEvent: + count: int + + +def get_broadcast_ip(timeout: float = None) -> str: + """ + 接收广播以获取机甲IP + + Receive broadcasting IP of Robomaster. + + :param timeout: 等待超时(秒)。 timeout in second + :return: 机甲IP地址。IP of Robomaster. + """ + BROADCAST_INITIAL: str = 'robot ip ' + + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.bind(('', IP_PORT)) + conn.settimeout(timeout) + msg, ip, port = None, None, None + try: + msg, (ip, port) = conn.recvfrom(DEFAULT_BUF_SIZE) + finally: + conn.close() + msg = msg.decode() + assert len(msg) > len(BROADCAST_INITIAL), f'broken msg from {ip}:{port}: {msg}' + msg = msg[len(BROADCAST_INITIAL):] + assert msg == ip, f'unmatched source({ip}) and reported IP({msg})' + return msg + + +class Commander: + def __init__(self, ip: str = '', timeout: float = 30): + """ + 创建SDK实例并连接机甲,实例在创建后立即可用。 + + Create a new SDK instance and connect it to Robomaster. + Instance is available immediately after creation. + + :param ip: 可选,机甲IP,可在路由器模式下自动获取。 (Optional) IP of Robomaster, which can be detected automatically under router mode. + :param timeout: 可选,TCP通讯超时(秒)。 (Optional) TCP timeout in second. + """ + self._mu: mp.Lock = CTX.Lock() + with self._mu: + if ip == '': + ip = get_broadcast_ip(timeout) + self._ip: str = ip + self._closed: bool = False + self._conn: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._timeout: float = timeout + self._conn.settimeout(self._timeout) + self._conn.connect((self._ip, CTRL_PORT)) + resp = self._do('command') + assert self._is_ok(resp) or resp == 'Already in SDK mode', f'entering SDK mode: {resp}' + + def close(self): + """ + 关闭实例,回收socket资源。注意这个命令并不会发送quit到机甲,避免打扰其他在线的Commander. + + Close instance, deallocate system socket resource. + Note this method will NOT send quit command to Robomaster, + because there may be other Commander still active. + """ + with self._mu: + self._conn.close() + self._closed = True + + def __enter__(self): + return self + + def __exit__(self): + self.close() + + @staticmethod + def _is_ok(resp: str) -> bool: + return resp == 'ok' + + def _do(self, *args) -> str: + assert len(args) > 0, 'empty arg not accepted' + assert not self._closed, 'connection is already closed' + cmd = ' '.join(map(str, args)) + ';' + self._conn.send(cmd.encode()) + buf = self._conn.recv(DEFAULT_BUF_SIZE) + # 返回值后面有时候会多一个迷之空格, + # 为了可能的向后兼容,额外剔除终止符。 + return buf.decode().strip(' ;') + + def arm_move(self, x: float, y: float) -> str: + return self._do(f'robotic_arm move x {x} y {y}') + + def arm_moveto(self, x: float, y: float) -> str: + return self._do(f'robotic_arm moveto x {x} y {y}') + + def arm_recenter(self) -> str: + return self._do('robotic_arm recenter') + + def arm_pos(self) -> int: + """ + x_pos: The x-axis coordinate in centimeters + y_pos: The y-axis coordinate in centimeters + """ + return self._do('robotic_arm position ?') + + def arm_stop(self) -> str: + return self._do('robotic_arm stop') + + def gripper_open(self, force: int = None) -> str: + assert force in [None, 1, 2, 3,4 ], "Force should be one of None, 1, 2, 3, 4" + return self._do(f'robotic_gripper open{f' leve {force}' if force else ''}') + + def gripper_close(self, force: int = None) -> str: + assert force in [None, 1, 2, 3,4 ], "Force should be one of None, 1, 2, 3, 4" + return self._do(f'robotic_gripper close{f' leve {force}' if force else ''}') + + def gripper_status(self) -> int: + """ + 0: The mechanical gripper is fully closed. + 1: The mechanical gripper is neither fully closed nor fully opened. + 2: The mechanical gripper is fully opened. + """ + return self._do('robotic_gripper status ?') + + def get_ip(self) -> str: + """ + 返回机甲IP。 + + get IP that the commander currently connects to. + + :return: 机甲IP。 IP that the commander currently connects to + """ + assert not self._closed, 'connection is already closed' + return self._ip + + def do(self, *args) -> str: + """ + 执行任意命令。 + + Execute any command. + + :param args: 命令内容。 command content. + :return: 命令返回。 the response of the command. + """ + with self._mu: + return self._do(*args) + + def version(self) -> str: + """ + 查询当前机甲的SDK版本。 + + query robomaster SDK version + + :return: SDK版本号。 SDK version string. + """ + return self.do('version') + + def robot_mode(self, mode: str) -> str: + """ + 更改机甲的运动模式。 + + Update Robomaster's movement mode. + + :param mode: 三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_* + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert mode in MODE_ENUMS, f'unknown mode {mode}' + resp = self.do('robot', 'mode', mode) + assert self._is_ok(resp), f'robot_mode: {resp}' + return resp + + def get_robot_mode(self) -> str: + """ + 查询当前机甲的运动模式。 + + Query for Robomaster's current movement mode. + + :return: 三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_* + """ + resp = self.do('robot', 'mode', '?') + assert resp in MODE_ENUMS, f'unexpected robot mode result: {resp}' + return resp + + def chassis_speed(self, x: float = 0, y: float = 0, z: float = 0) -> str: + """ + 更改底盘运动速度。 + + Update chassis speed. + + :param x: x 轴向运动速度,单位 m/s speed in x axis, in m/s + :param y: y 轴向运动速度,单位 m/s speed in y axis, in m/s + :param z: z 轴向旋转速度,单位 °/s rotation speed in z axis, in °/s + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert -3.5 <= x <= 3.5, f'x {x} is out of range' + assert -3.5 <= y <= 3.5, f'y {y} is out of range' + assert -600 <= z <= 600, f'z {z} is out of range' + resp = self.do('chassis', 'speed', 'x', x, 'y', y, 'z', z) + assert self._is_ok(resp), f'chassis_speed: {resp}' + return resp + + def get_chassis_speed(self) -> ChassisSpeed: + """ + 获取底盘速度。 + + Query for chassis speed. + + :return: x 轴向运动速度(m/s),y 轴向运动速度(m/s),z 轴向旋转速度(°/s),w1 右前麦轮速度(rpm),w2 左前麦轮速速(rpm),w3 右后麦轮速度(rpm),w4 左后麦轮速度(rpm)。 + speed in x axis(m/s), speed in y axis(m/s), rotation speed in z axis(°/s), w1(right front) wheel speed(rpm), w2(left front) wheel speed(rpm), w3(right back) wheel speed(rpm), w4(left back) wheel speed(rpm) + """ + resp = self.do('chassis', 'speed', '?') + ans = resp.split(' ') + assert len(ans) == 7, f'get_chassis_speed: {resp}' + return ChassisSpeed(x=float(ans[0]), y=float(ans[1]), z=float(ans[2]), w1=int(ans[3]), w2=int(ans[4]), w3=int(ans[5]), w4=int(ans[6])) + + def chassis_wheel(self, w1: int = 0, w2: int = 0, w3: int = 0, w4: int = 0) -> str: + """ + 更改底盘轮子速度。 + + Update chassis wheel rotation speed. + + :param w1: 右前麦轮速度,单位 rpm w1(right front) wheel speed(rpm) + :param w2: 左前麦轮速度,单位 rpm w2(left front) wheel speed(rpm) + :param w3: 右后麦轮速度,单位 rpm w3(right back) wheel speed(rpm) + :param w4: 左后麦轮速度,单位 rpm w4(left back) wheel speed(rpm) + :return ok: ok,否则raise。 ok, or raise certain exception. + """ + for i, v in enumerate((w1, w2, w3, w4)): + assert -1000 <= v <= 1000, f'w{i + 1} {v} is out of range' + resp = self.do('chassis', 'wheel', 'w1', w1, 'w2', w2, 'w3', w3, 'w4', w4) + assert self._is_ok(resp), f'chassis_wheel: {resp}' + return resp + + def chassis_move(self, x: float = 0, y: float = 0, z: float = 0, speed_xy: float = None, speed_z: float = None) -> str: + """ + 控制底盘运动当指定位置,坐标轴原点为当前位置。 + + Make chassis move to specified location. The origin is current location. + + :param x: x 轴向运动距离,单位 m movement in x axis, in meter + :param y: y 轴向运动距离,单位 m movement in y axis, in meter + :param z: z 轴向旋转角度,单位 ° movement in z axis, in degree + :param speed_xy: xy 轴向运动速度,单位 m/s speed in both x and y axis, in meter/second + :param speed_z: z 轴向旋转速度, 单位 °/s speed in z axis, in degree/second + :return ok: ok,否则raise。 ok, or raise certain exception. + """ + assert -5 <= x <= 5, f'x {x} is out of range' + assert -5 <= y <= 5, f'y {y} is out of range' + assert -1800 <= z <= 1800, f'z {z} is out of range' + assert speed_xy is None or 0 < speed_xy <= 3.5, f'speed_xy {speed_xy} is out of range' + assert speed_z is None or 0 < speed_z <= 600, f'speed_z {speed_z} is out of range' + cmd = ['chassis', 'move', 'x', x, 'y', y, 'z', z] + if speed_xy is not None: + cmd += ['vxy', speed_xy] + if speed_z is not None: + cmd += ['vz', speed_z] + resp = self.do(*cmd) + assert self._is_ok(resp), f'chassis_move: {resp}' + return resp + + def get_chassis_position(self) -> ChassisPosition: + """ + 获取底盘当前的位置(以上电时刻位置为原点)。 + + Query for chassis location. The origin is where Robomaster powers on. + + :return: x 轴位置(m),y 轴位置(m),偏航角度(°)。 location consisting of x, y, z, in meter, meter, degree. + """ + resp = self.do('chassis', 'position', '?') + ans = resp.split(' ') + assert len(ans) == 3, f'get_chassis_position: {resp}' + return ChassisPosition(float(ans[0]), float(ans[1]), float(ans[2])) + + def get_chassis_attitude(self) -> ChassisAttitude: + """ + 获取底盘姿态信息。 + + Query for chassis attitude. + + :return: pitch 轴角度(°),roll 轴角度(°),yaw 轴角度(°)。 pitch, roll, yaw in degree. + """ + resp = self.do('chassis', 'attitude', '?') + ans = resp.split(' ') + assert len(ans) == 3, f'get_chassis_attitude: {resp}' + return ChassisAttitude(float(ans[0]), float(ans[1]), float(ans[2])) + + def get_chassis_status(self) -> ChassisStatus: + """ + 获取底盘状态信息。 + + Query for chassis status. + + :return: 底盘状态,详见 ChassisStatus chassis status, see class ChassisStatus. + """ + resp = self.do('chassis', 'status', '?') + ans = resp.split(' ') + assert len(ans) == 11, f'get_chassis_status: {resp}' + return ChassisStatus(*map(lambda x: bool(int(x)), ans)) + + def chassis_push_on(self, position_freq: int = None, attitude_freq: int = None, status_freq: int = None, all_freq: int = None) -> str: + """ + 打开底盘中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50. + + Enable chassis push of specified attribution. Supported frequencies are 1, 5, 10, 20, 30, 50. + + :param position_freq: 位置推送频率,不设定则设为None. position push frequency, None for no-op. + :param attitude_freq: 姿态推送频率,不设定则设为None. attitude push frequency, None for no-op. + :param status_freq: 状态推送频率,不设定则设为None. status push frequency, None for no-op. + :param all_freq: 统一设置所有推送频率,设置则开启所有推送。 update all push frequency, this affects all attribution. + :return: ok,否则raise。 ok, or raise certain exception. + """ + valid_frequencies = (1, 5, 10, 20, 30, 50) + cmd = ['chassis', 'push'] + if all_freq is not None: + assert all_freq in valid_frequencies, f'all_freq {all_freq} is not valid' + cmd += ['freq', all_freq] + else: + if position_freq is not None: + assert position_freq in valid_frequencies, f'position_freq {position_freq} is not valid' + cmd += ['position', SWITCH_ON, 'pfreq', position_freq] + if attitude_freq is not None: + assert attitude_freq in valid_frequencies, f'attitude_freq {attitude_freq} is not valid' + cmd += ['attitude', SWITCH_ON, 'afreq', attitude_freq] + if status_freq is not None: + assert status_freq in valid_frequencies, f'status_freq {status_freq} is not valid' + cmd += ['status', SWITCH_ON, 'sfreq', status_freq] + assert len(cmd) > 2, 'at least one argument should not be None' + resp = self.do(*cmd) + assert self._is_ok(resp), f'chassis_push_on: {resp}' + return resp + + def chassis_push_off(self, position: bool = False, attitude: bool = False, status: bool = False, all: bool = False) -> str: + """ + 关闭底盘中相应属性的信息推送。 + + Disable chassis push of specified attribution. + + :param position: 是否关闭位置推送。 whether disable position push. + :param attitude: 是否关闭姿态推送。 whether disable attitude push. + :param status: 是否关闭状态推送。 whether disable status push. + :param all: 关闭所有推送。 whether disable all pushes. + :return: ok,否则raise。 ok, or raise certain exception. + """ + cmd = ['chassis', 'push'] + if all or position: + cmd += ['position', SWITCH_OFF] + if all or attitude: + cmd += ['attitude', SWITCH_OFF] + if all or status: + cmd += ['status', SWITCH_OFF] + + assert len(cmd) > 2, 'at least one argument should be True' + resp = self.do(*cmd) + assert self._is_ok(resp), f'chassis_push_off: {resp}' + return resp + + def gimbal_speed(self, pitch: float, yaw: float) -> str: + """ + 控制云台运动速度。 + + Update gimbal speed. + + :param pitch: pitch 轴速度,单位 °/s Pitch speed in °/s + :param yaw: yaw 轴速度,单位 °/s yaw speed in °/s + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert -450 <= pitch <= 450, f'pitch {pitch} is out of range' + assert -450 <= yaw <= 450, f'yaw {yaw} is out of range' + resp = self.do('gimbal', 'speed', 'p', pitch, 'y', yaw) + assert self._is_ok(resp), f'gimbal_speed: {resp}' + return resp + + def gimbal_move(self, pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) -> str: + """ + 控制云台运动到指定位置,坐标轴原点为当前位置。 + + Make gimbal move to specified location. The origin is current location. + + :param pitch: pitch 轴角度, 单位 ° pitch delta in degree + :param yaw: yaw 轴角度, 单位 ° yaw delta in degree + :param pitch_speed: pitch 轴运动速速,单位 °/s pitch speed in °/s + :param yaw_speed: yaw 轴运动速速,单位 °/s yaw speed in °/s + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert -55 <= pitch <= 55, f'pitch {pitch} is out of range' + assert -55 <= yaw <= 55, f'yaw {yaw} is out of range' + assert pitch_speed is None or 0 < pitch_speed <= 540, f'pitch_speed {pitch_speed} is out of range' + assert yaw_speed is None or 0 < yaw_speed <= 540, f'yaw_speed {yaw_speed} is out of range' + cmd = ['gimbal', 'move', 'p', pitch, 'y', yaw] + if pitch_speed is not None: + cmd += ['vp', pitch_speed] + if yaw_speed is not None: + cmd += ['vy', yaw_speed] + resp = self.do(*cmd) + assert self._is_ok(resp), f'gimbal_move: {resp}' + return resp + + def gimbal_moveto(self, pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) -> str: + """ + 控制云台运动到指定位置,坐标轴原点为上电位置。 + + Make gimbal move to specified location. The origin is gimbal center. + + :param pitch: pitch 轴角度, 单位 ° pitch delta in degree + :param yaw: yaw 轴角度, 单位 ° yaw delta in degree + :param pitch_speed: pitch 轴运动速速,单位 °/s pitch speed in °/s + :param yaw_speed: yaw 轴运动速速,单位 °/s yaw speed in °/s + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert -25 <= pitch <= 30, f'pitch {pitch} is out of range' + assert -250 <= yaw <= 250, f'yaw {yaw} is out of range' + assert pitch_speed is None or 0 < pitch_speed <= 540, f'pitch_speed {pitch_speed} is out of range' + assert yaw_speed is None or 0 < yaw_speed <= 540, f'yaw_speed {yaw_speed} is out of range' + cmd = ['gimbal', 'moveto', 'p', pitch, 'y', yaw] + if pitch_speed is not None: + cmd += ['vp', pitch_speed] + if yaw_speed is not None: + cmd += ['vy', yaw_speed] + resp = self.do(*cmd) + assert self._is_ok(resp), f'gimbal_moveto: {resp}' + return resp + + def gimbal_suspend(self): + """ + 使云台进入休眠状态。 + + Suspend(sleep) the gimbal. + + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('gimbal', 'suspend') + assert self._is_ok(resp), f'gimbal_suspend: {resp}' + return resp + + def gimbal_resume(self): + """ + 控制云台从休眠状态中恢复 + + awake the gimbal if it is suspended(sleeping). + + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('gimbal', 'resume') + assert self._is_ok(resp), f'gimbal_resume: {resp}' + return resp + + def gimbal_recenter(self): + """ + 控制云台回中。 + + Recenter the gimbal. + + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('gimbal', 'recenter') + assert self._is_ok(resp), f'gimbal_recenter: {resp}' + return resp + + def get_gimbal_attitude(self) -> GimbalAttitude: + """ + 获取云台姿态信息。 + + Query for gimbal attitude. + + :return: pitch 轴角度(°),yaw 轴角度(°) pitch, yaw in degree + """ + resp = self.do('gimbal', 'attitude', '?') + ans = resp.split(' ') + assert len(ans) == 2, f'get_gimbal_attitude: {resp}' + return GimbalAttitude(pitch=float(ans[0]), yaw=float(ans[1])) + + def gimbal_push_on(self, attitude_freq: int = 5) -> str: + """ + 打开云台中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50. + + Enable gimbal attribution push. Supported frequencies are 1, 5, 10, 20, 30, 50. + + :param attitude_freq: 姿态推送频率. attitude push frequency. + :return: ok,否则raise。 ok, or raise certain exception. + """ + valid_frequencies = (1, 5, 10, 20, 30, 50) + assert attitude_freq in valid_frequencies, f'invalid attitude_freq {attitude_freq}' + resp = self.do('gimbal', 'push', 'attitude', SWITCH_ON, 'afreq', attitude_freq) + assert self._is_ok(resp), f'gimbal_push_on: {resp}' + return resp + + def gimbal_push_off(self, attitude: bool = True) -> str: + """ + 关闭云台中相应属性的信息推送。 + + Disable gimbal push of specified attribution. + + :param attitude: 关闭姿态推送。 whether disable attitude push. + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert attitude, 'at least one augment should be True' + resp = self.do('gimbal', 'push', 'attitude', SWITCH_OFF) + assert self._is_ok(resp), f'gimbal_push_off: {resp}' + return resp + + def armor_sensitivity(self, value: int) -> str: + """ + 设置装甲板打击检测灵敏度。 + + Update armor sensitivity. + + :param value: 装甲板灵敏度,数值越大,越容易检测到打击。默认灵敏度值为 5. + armor sensitivity, the bigger, the more sensitive. Default to 5. + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert 1 <= value <= 10, f'value {value} is out of range' + resp = self.do('armor', 'sensitivity', value) + assert self._is_ok(resp), f'armor_sensitivity: {resp}' + return resp + + def get_armor_sensitivity(self) -> int: + """ + 获取装甲板打击检测灵敏度。 + + Query for armor sensitivity. + + :return: 装甲板灵敏度 armor sensitivity. + """ + resp = self.do('armor', 'sensitivity', '?') + return int(resp) + + def armor_event(self, attr: str, switch: bool) -> str: + """ + 控制装甲板检测事件上报。 + + Enable or disable specified armor event. + + :param attr: 事件属性名称,范围见 ARMOR_ENUMS. armor event name, see ARMOR_ENUMS. + :param switch: 是否开启上报 on/off + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert attr in ARMOR_ENUMS, f'unexpected armor event attr {attr}' + resp = self.do('armor', 'event', attr, SWITCH_ON if switch else SWITCH_OFF) + assert self._is_ok(resp), f'armor_event: {resp}' + return resp + + def sound_event(self, attr: str, switch: bool) -> str: + """ + 控制声音识别事件上报。 + + Enable or disable specified sound event. + + :param attr: 事件属性名称,范围见 SOUND_ENUMS. sound event name, see ARMOR_ENUMS. + :param switch: 是否开启上报 on/off + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert attr in SOUND_ENUMS, f'unexpected armor event attr {attr}' + resp = self.do('sound', 'event', attr, SWITCH_ON if switch else SWITCH_OFF) + assert self._is_ok(resp), f'armor_event: {resp}' + return resp + + def led_control(self, comp: str, effect: str, r: int, g: int, b: int) -> str: + """ + 控制 LED 灯效。跑马灯效果仅可作用于云台两侧 LED。 + + Update LED effects. Note scrolling effect works only on gimbal LEDs. + + :param comp: LED 编号,见 LED_ENUMS LED composition, see LED_ENUMS + :param effect: 灯效类型,见 LED_EFFECT_ENUMS effect type, see LED_EFFECT_ENUMS + :param r: RGB 红色分量值 RGB red value + :param g: RGB 绿色分量值 RGB green value + :param b: RGB 蓝色分量值 RGB blue value + :return: ok,否则raise。 ok, or raise certain exception. + """ + assert comp in LED_ENUMS, f'unknown comp {comp}' + assert effect in LED_EFFECT_ENUMS, f'unknown effect {effect}' + assert 0 <= r <= 255, f'r {r} is out of scope' + assert 0 <= g <= 255, f'g {g} is out of scope' + assert 0 <= b <= 255, f'b {b} is out of scope' + if effect == LED_EFFECT_SCROLLING: + assert comp in (LED_TOP_ALL, LED_TOP_LEFT, LED_TOP_RIGHT), 'scrolling effect works only on gimbal LEDs' + resp = self.do('led', 'control', 'comp', comp, 'r', r, 'g', g, 'b', b, 'effect', effect) + assert self._is_ok(resp), f'led_control: {resp}' + return resp + + def ir_sensor_measure(self, switch: bool = True): + """ + 打开/关闭所有红外传感器开关。 + + Enable or disable all IR sensor. + + :param switch: 打开/关闭 on/off + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('ir_distance_sensor', 'measure', SWITCH_ON if switch else SWITCH_OFF) + assert self._is_ok(resp), f'ir_sensor_measure: {resp}' + return resp + + def get_ir_sensor_distance(self, id: int) -> float: + """ + 获取指定 ID 的红外深度传感器距离。 + + Query for distance reported by specified IR sensor. + + :param id: 红外传感器的 ID ID of IR sensor + :return: 指定 ID 的红外传感器测得的距离值,单位 mm distance in mm + """ + assert 1 <= id <= 4, f'invalid IR sensor id {id}' + resp = self.do('ir_distance_sensor', 'distance', id, '?') + return float(resp) + + def stream(self, switch: bool) -> str: + """ + 视频流开关控制。 + + Enable or disable video stream. + + :param switch: 打开/关闭 on/off + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('stream', SWITCH_ON if switch else SWITCH_OFF) + assert self._is_ok(resp), f'stream: {resp}' + return resp + + def audio(self, switch: bool) -> str: + """ + 音频流开关控制。 + + Enable or disable audio stream. + + :param switch: 打开/关闭 on/off + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('audio', SWITCH_ON if switch else SWITCH_OFF) + assert self._is_ok(resp), f'audio: {resp}' + return resp + + def blaster_fire(self) -> str: + """ + 控制水弹枪发射一次。 + + Fire once. + + :return: ok,否则raise。 ok, or raise certain exception. + """ + resp = self.do('blaster', 'fire') + assert self._is_ok(resp), f'blaster_fire: {resp}' + return resp diff --git a/controller/packagesrc/framework.py b/controller/packagesrc/framework.py new file mode 100644 index 0000000..3e5979c --- /dev/null +++ b/controller/packagesrc/framework.py @@ -0,0 +1,600 @@ +# -*- coding: utf-8 -*- + +# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗ +# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ +# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝ +# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝ +# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║ +# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + +import logging +import multiprocessing as mp +import queue +import signal +import socket +import sys +import time +from typing import List, Callable, Tuple, Optional, Iterator + +import cv2 as cv + +from .client import CTX, LOG_LEVEL, PUSH_PORT, GimbalAttitude, ChassisPosition, ChassisAttitude, ChassisStatus, DEFAULT_BUF_SIZE, EVENT_PORT, ARMOR_HIT, ArmorHitEvent, SOUND_APPLAUSE, SoundApplauseEvent, VIDEO_PORT, Commander + + +class Worker: + """ + 用户逻辑的载体,继承这个类然后将你的逻辑写到 ``work()`` 方法中即可。 + 你的逻辑可以是有状态的或无状态的,如果需要,你可以在继承的新类中使用自定义的任意属性保存你的状态。 + 如果你需要打印日志,使用 ``Worker.logger`` 属性。 + 创建你的新类的实例后使用 ``Hub.worker()`` 注册到Hub实例供其调用。 + + 一个Worker子类一般只做一件事情,多个Worker子类各司其职,相互协作,通过 ``multiprocessing.Queue`` 进行单向通讯, + 最终,负责传感器的Worker的数据会汇聚到负责控制的Worker中,负责控制的Worker再使用 ``Commander`` 向机甲下令。 + + RoboMasterPy.framework中提供了多个定制化,开箱即用的Worker以满足 + 接收视频流(Vision)、接收事件(EventListener)和推送(PushListener)、汇聚信息控制机甲(Mind)等常见需求, + 请参阅API文档中的“预置Worker”部分。 + + Worker takes user's business logic. Inherit this class and write logic code in ``work()`` method. + A worker can be stateful or stateless, at your choice. You may use some user-defined attributes to store your state if the need arises. + Use ``Worker.logger`` attribute if some logs need to be printed. + Register your well-defined new worker to hub using ``Hub.worker()`` so that hub schedule and calls your logic. + + One Worker subclass nearly always does only one business. Multiple Worker subclasses do their own jobs and cooperate, + communicate through ``multiprocessing.Queue`` in one-way fashion. + Data from those subclasses in charge of sensor flows into subclass in charge of controlling, who command your Robomaster by ``Commander``. + + RoboMasterPy.framework provides many customized and out-of-box Worker to cover common usage like + receiving video stream(vision), receiving events(EventListener) & pushes(PushListener), gathering info and controlling the Robomaster(Mind), etc. + Please refer to "Sugared Workers" section in API doc. + """ + + QUEUE_TIMEOUT: float = 0.05 + + def __init__(self, name: str, out: Optional[mp.Queue], protocol: Optional[str], address: Tuple[str, int], timeout: Optional[float], loop: bool = True): + """ + 初始化Worker,这个方法一般在子类的__init__()方法中调用,不会直接使用。 + + Initialize Worker, called in its subclasses's __init__(), seldom used directly. + + :param name: worker实例的名称,这个名称也会作为进程的名称,你应该使用一个有利于调试的,描述性好的名字。 + name of Worker instance, which is also the name of the process. Choose wisely to benefit when debugging. + :param out: (可选)用于输出产物的multiprocessing.Queue,定义后Worker._outlet()方法可用。 + (Optional) a multiprocessing.Queue to put worker's product. Worker._outlet() is available after this parameter is defined. + :param protocol: 连接机甲的协议名称,从tcp, udp和None中选择,在tcp和udp选项下Worker._intake()方法可用。 + Protocol to use to connect your Robomaster, choose one in tcp, udp, None. Worker._intake() is available under tcp or udp. + :param address: 机甲的IP地址和端口号,可从Commander.get_ip()中获得机甲IP,端口号和业务有关,见framework.*_PORT. + IP and port to Robomaster. IP is available using Commander.get_ip(), and refer to framework.*_PORT for port number. + :param timeout: tcp或udp的连接超时。 timeout of tcp or udp. + :param loop: 是否在Worker的生命周期中循环调用work()方法,常见True,在方法提供自己的生命周期管理的时候可选False. + Whether call work() method in a loop for Worker's lifetime, usually True. Set to False when work() has its own lifecycle management. + """ + assert name is not None and name != '', 'choose a good name to make life easier' + + self._mu = CTX.Lock() + with self._mu: + signal.signal(signal.SIGINT, self._handle_close_signal) + signal.signal(signal.SIGTERM, self._handle_close_signal) + self._name: str = name + self._closed: bool = False + self._address: Tuple[str, int] = address + self._out: Optional[mp.Queue] = out + self._logger: logging.Logger = logging.getLogger(name) + self._logger.setLevel(LOG_LEVEL) + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter('%(asctime)s %(name)-12s : %(levelname)-8s %(message)s') + handler.setFormatter(formatter) + self._logger.addHandler(handler) + self._loop: bool = loop + + if protocol == 'tcp': + self._conn: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._conn.settimeout(timeout) + self._conn.connect(self._address) + elif protocol == 'udp': + self._conn: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._conn.settimeout(timeout) + self._conn.bind(self._address) + elif protocol is None: + self._conn: Optional[socket.socket] = None + pass + else: + raise ValueError(f'unknown protocol {protocol}') + + def _handle_close_signal(self, sig, stacks): + self.close() + + @property + def closed(self) -> bool: + """ + 当前Worker是否已经永久停止。 + + Whether the worker has stopped working eventually. + + :return: True for stopped. + """ + return self._closed + + def close(self): + """ + 让Worker停止工作,本方法一般由Hub调用。 + + Let worker stop. Nearly always called by Hub. + """ + with self._mu: + if self.closed: + return + + self._closed = True + self.logger.info('signal received, closing...') + if self._conn is not None: + self._conn.close() + if self._out is not None and type(self._out) == mp.Queue: + self._out.close() + + @property + def name(self) -> str: + """ + 当前Worker的名字。 + + The worker's name. + """ + return self._name + + def work(self) -> None: + """ + 在本方法中实现你的业务逻辑,你可能需要在这里使用下列方法和属性: + + * 使用 ``self._intake()`` 方法从tcp或udp中获取数据; + * 使用 ``self._outlet()`` 方法将产物放到out中,注意,如果out被没有即时消费的产物填满,self._outlet()会丢弃最新的产物; + * 使用 ``self.logger`` 属性打印日志。 + + Implement your business logic in this method. These methods and attributes may be useful: + + * use ``self._intake()`` method to intake data from tcp or udp connection; + * use ``self._outlet()`` to put product to ``out``. Keep in mind if ``out`` is filled up with unconsumed product, self._outlet() discards the latest products. + * use ``self.logger`` for log printing. + """ + raise NotImplementedError('implement me') + + def __call__(self) -> None: + try: + if self._loop: + while not self.closed: + self.work() + else: + self.work() + except EOFError: + if not self.closed: + raise + finally: + self.close() + + @property + def logger(self) -> logging.Logger: + """ + 使用本属性打印日志。 + + Use this attribute for logging. + """ + return self._logger + + def _assert_ready(self): + assert not self.closed, 'Worker is already closed' + + def _intake(self, buf_size: int): + self._assert_ready() + return self._conn.recv(buf_size) + + def _outlet(self, payload): + self._assert_ready() + while not self.closed: + try: + self._out.put(payload, block=True, timeout=self.QUEUE_TIMEOUT) + except queue.Full: + continue + break + + def get_address(self) -> Tuple[str, int]: + """ + 获取Worker连接的IP和port. + + get IP and port which the worker currently connects to. + + :return: (IP, port) + """ + return self._address + + def __enter__(self): + return self + + def __exit__(self): + self.close() + + +class Hub: + """ + 程序中枢。 + + * 使用 ``self.worker()`` 注册你的Worker; + * 使用 ``self.run()`` 开始运行; + * 使用 ``Ctrl + C`` 停止程序。 + + Hub is the orchestrator. + + * Use ``self.worker()`` to register your worker; + * Use ``self.run()`` to run; + * Use ``Ctrl + C`` for exiting. + """ + + TERMINATION_TIMEOUT = 10 + + def __init__(self): + """ + 创建Hub不需要提供参数。 + + Hub does not need parameters to initialize. + """ + self._mu = CTX.Lock() + with self._mu: + self._closed: bool = False + self._workers: List = [] + + def close(self): + """ + 让Hub以及Hub名下的所有Worker停止工作。这个方法不需要用户来调用。 + + Stop hub and workers registered under hub. This method does not need to be called by user. + """ + with self._mu: + if self._closed: + return + self._closed = True + + end_time = time.time() + self.TERMINATION_TIMEOUT + for worker in self._workers: + remain_time = max(0.0, end_time - time.time()) + worker.join(remain_time) + for worker in self._workers: + if worker.is_alive(): + worker.terminate() + + def _assert_ready(self): + assert not self._closed, 'Hub is closed' + + def __enter__(self): + return self + + def __exit__(self): + self.close() + + def worker(self, worker_class, name: str, args: Tuple = (), kwargs=None): + """ + 将worker注册到hub. + 所有的worker都在独立的进程中运行。 + + Register worker to hub. + All workers run in their own operating system process. + + :param worker_class: worker的类,注意不是worker实例。 + class of worker to be registered, note provide the class, instead of an instance. + :param name: worker的名字,选个好名字可以让调试更容易。 + worker's name. A good name makes debugging less painful. + :param args: 创建worker需要使用的参数。 + args to initialize the worker. + :param kwargs: 创建worker需要使用的kwargs参数。 + kwargs to initialize the worker. + """ + if kwargs is None: + kwargs = {} + process = CTX.Process(name=name, target=self._build_worker_and_run, args=(worker_class, name, *args), kwargs=kwargs) + self._workers.append(process) + + @staticmethod + def _build_worker_and_run(*args, **kwargs): + worker_class = args[0] + worker = worker_class(*args[1:], **kwargs) + worker() + + def run(self): + """ + 按注册顺序启动所有worker,阻塞主进程。 + Hub在接收到 ``SIGTERM`` 或者 ``SIGINT`` 时会尝试安全退出。 + + Start workers and block the main process. + Hub tries to shutdown itself gracefully when receiving + ``SIGTERM`` or ``SIGINT``. + """ + with self._mu: + self._assert_ready() + assert len(self._workers) > 0, 'no worker registered' + for worker in self._workers: + worker.start() + + signal.sigwait((signal.SIGINT, signal.SIGTERM)) + self.close() + for worker in self._workers: + try: + # close method does not exist until Python 3.7 and better + if getattr(worker, 'close', False): + worker.close() + except Exception as e: + logging.error('[resource leak warning] failed to close process "%s": %s', worker._name, e) + + +class PushListener(Worker): + """ + 监听并解析机甲大师的推送,输出强类型的推送内容。 + + Listen and parse pushes from Robomaster, product parsed pushes in strong typed manner. + """ + PUSH_TYPE_CHASSIS: str = 'chassis' + PUSH_TYPE_GIMBAL: str = 'gimbal' + PUSH_TYPES: Tuple[str] = (PUSH_TYPE_CHASSIS, PUSH_TYPE_GIMBAL) + + def __init__(self, name: str, out: mp.Queue): + """ + 初始化自身。 + + Initialize self. + + :param name: worker名称 name of worker + :param out: PushListener会将产物放入其中以供下游消费。 + PushListener puts product into ``out`` for downstream consuming. + """ + super().__init__(name, out, 'udp', ('', PUSH_PORT), None) + + def _parse(self, msg: str) -> List: + payloads: Iterator[str] = map(lambda x: x.strip(), msg.strip(' ;').split(';')) + current_push_type: Optional[str] = None + has_type_prefix: bool = False + parsed: List = [] + for index, payload in enumerate(payloads): + words = payload.split(' ') + assert len(words) > 1, f'unexpected payload at index {index}, context: {msg}' + if words[0] in self.PUSH_TYPES: + current_push_type = words[0] + has_type_prefix = True + else: + has_type_prefix = False + assert current_push_type is not None, f'can not decide push type of payload at index {index}, context: {msg}' + + if current_push_type == self.PUSH_TYPE_CHASSIS: + parsed.append(self._parse_chassis_push(words, has_type_prefix)) + elif current_push_type == self.PUSH_TYPE_GIMBAL: + parsed.append(self._parse_gimbal_push(words, has_type_prefix)) + else: + raise ValueError(f'unknown push type {current_push_type} at index {index}, context: {msg}') + return parsed + + @staticmethod + def _parse_gimbal_push(words: List[str], has_type_prefix: bool): + subtype: str = '' + if has_type_prefix: + assert len(words) > 3, f'invalid gimbal push payload, words: {words}' + subtype = words[2] + else: + assert len(words) > 1, f'invalid gimbal push payload, words: {words}' + subtype = words[0] + + if subtype == 'attitude': + return GimbalAttitude(float(words[-2]), float(words[-1])) + else: + raise ValueError(f'unknown gimbal push subtype {subtype}, context: {words}') + + @staticmethod + def _parse_chassis_push(words: List[str], has_type_prefix: bool): + subtype: str = '' + if has_type_prefix: + assert len(words) > 3, f'invalid chassis push payload, words: {words}' + subtype = words[2] + else: + assert len(words) > 1, f'invalid chassis push payload, words: {words}' + subtype = words[0] + + if subtype == 'position': + return ChassisPosition(float(words[-2]), float(words[-1]), None) + elif subtype == 'attitude': + return ChassisAttitude(float(words[-3]), float(words[-2]), float(words[-1])) + elif subtype == 'status': + ans = words[-11:] + assert len(ans) == 11, f'invalid chassis status payload, words: {words}' + return ChassisStatus(*map(lambda x: bool(int(x)), ans)) + else: + raise ValueError(f'unknown chassis push subtype {subtype}, context: {words}') + + def work(self) -> None: + try: + msg = self._intake(DEFAULT_BUF_SIZE).decode() + except OSError: + if self.closed: + return + else: + raise + payloads = self._parse(msg) + for payload in payloads: + self._outlet(payload) + + +class EventListener(Worker): + """ + 监听并解析机甲大师的事件,输出强类型的推送内容。 + + Listen and parse events from Robomaster, product parsed events in strong typed manner. + """ + + EVENT_TYPE_ARMOR: str = 'armor' + EVENT_TYPE_SOUND: str = 'sound' + EVENT_TYPES: Tuple[str] = (EVENT_TYPE_ARMOR, EVENT_TYPE_SOUND) + + def __init__(self, name: str, out: mp.Queue, ip: str): + """ + 初始化自身。 + + Initialize self. + + :param name: worker名称 name of worker + :param out: PushListener会将产物放入其中以供下游消费。 + PushListener puts product into ``out`` for downstream consuming. + :param ip: 机甲的IP,可从Commander.get_ip()取得。 + IP of your Robomaster, can be obtained from Commander.get_ip() + """ + super().__init__(name, out, 'tcp', (ip, EVENT_PORT), None) + + def _parse(self, msg: str) -> List: + payloads: Iterator[str] = map(lambda x: x.strip(), msg.strip(' ;').split(';')) + current_event_type: Optional[str] = None + has_type_prefix: bool = False + parsed: List = [] + for index, payload in enumerate(payloads): + words = payload.split(' ') + assert len(words) > 1, f'unexpected payload at index {index}, context: {msg}' + if words[0] in self.EVENT_TYPES: + current_event_type = words[0] + has_type_prefix = True + else: + has_type_prefix = False + assert current_event_type is not None, f'can not decide event type of payload at index {index}, context: {msg}' + + if current_event_type == self.EVENT_TYPE_ARMOR: + parsed.append(self._parse_armor_event(words, has_type_prefix)) + elif current_event_type == self.EVENT_TYPE_SOUND: + parsed.append(self._parse_sound_event(words, has_type_prefix)) + else: + raise ValueError(f'unknown event type {current_event_type} at index {index}, context: {msg}') + return parsed + + @staticmethod + def _parse_armor_event(words: List[str], has_type_prefix: bool): + subtype: str = '' + if has_type_prefix: + assert len(words) > 3, f'invalid armor event payload, words: {words}' + subtype = words[2] + else: + assert len(words) > 1, f'invalid armor event payload, words: {words}' + subtype = words[0] + + if subtype == ARMOR_HIT: + return ArmorHitEvent(int(words[-2]), int(words[-1])) + else: + raise ValueError(f'unknown armor event subtype {subtype}, context: {words}') + + @staticmethod + def _parse_sound_event(words: List[str], has_type_prefix: bool): + subtype: str = '' + if has_type_prefix: + assert len(words) > 3, f'invalid sound event payload, words: {words}' + subtype = words[2] + else: + assert len(words) > 1, f'invalid sound event payload, words: {words}' + subtype = words[0] + + if subtype == SOUND_APPLAUSE: + return SoundApplauseEvent(int(words[-1])) + else: + raise ValueError(f'unknown sound event subtype {subtype}, context: {words}') + + def work(self) -> None: + try: + msg = self._intake(DEFAULT_BUF_SIZE).decode() + except OSError: + if self.closed: + return + else: + raise + payloads = self._parse(msg) + for payload in payloads: + self._outlet(payload) + + +class Vision(Worker): + """ + 拉取并解析机甲的视频流,回调函数会收到解析好的OpenCV视频帧,回调函数的返回值会被放置到 ``out`` 中。 + + Pull and parse Robomaster's video stream, call the callback with parsed OpenCV frame, + and put return value from callback into ``out``. + """ + + TIMEOUT: float = 5.0 + + def __init__(self, name: str, out: Optional[mp.Queue], ip: str, processing: Callable[..., None], none_is_valid=False): + """ + 初始化自身。 + + Initialize self. + + :param name: worker名称 name of worker + :param out: PushListener会将产物放入其中以供下游消费。 + PushListener puts product into ``out`` for downstream consuming. + :param ip: 机甲的IP,可从Commander.get_ip()取得。 + IP of your Robomaster, can be obtained from Commander.get_ip() + :param processing: 回调函数,每当有新的视频帧到来时,函数都会被Vision调用,形如 ``processing(frame=frame, logger=self.logger)`` , + 其中frame为cv2(OpenCV) frame,logger可用于日志打印。 + callback function, is called every time when a new frame comes, in form ``processing(frame=frame, logger=self.logger)``, + where frame is cv2(OpenCV) frame, and logger is for logging. + :param none_is_valid: 是否在回调函数返回None时将None放入 ``out`` ,默认为False. + Whether to put None returned from callback function into ``out``, default to False. + """ + super().__init__(name, out, None, (ip, VIDEO_PORT), self.TIMEOUT) + self._none_is_valid = none_is_valid + self._processing = processing + self._cap = cv.VideoCapture(f'tcp://{ip}:{VIDEO_PORT}') + assert self._cap.isOpened(), 'failed to connect to video stream' + self._cap.set(cv.CAP_PROP_BUFFERSIZE, 4) + + def close(self): + self._cap.release() + cv.destroyAllWindows() + super().close() + + def work(self) -> None: + ok, frame = self._cap.read() + if not ok: + if self.closed: + return + else: + raise ValueError('can not receive frame (stream end?)') + processed = self._processing(frame=frame, logger=self.logger) + if processed is not None or self._none_is_valid: + self._outlet(processed) + + +class Mind(Worker): + """ + 无状态的控制者,适用于简单的控制。 + 对于复杂的,有状态的控制需求,你需要自己继承Worker来实现。 + + Stateless controller, suits simple and naive controlling scenario. + For complicated, stateful controlling, inherit Worker to implement. + """ + def __init__(self, name: str, queues: Tuple[mp.Queue, ...], ip: str, processing: Callable[..., None], timeout: float = 30, loop: bool = True): + """ + 初始化自身。 + + Initialize self. + + :param name: worker名称 name of worker + :param queues: 队列元组,其中队列的内容由上游提供。 + Tuple of mp.Queue, where their contents are provided by upstream. + :param ip: 机甲的IP,可从Commander.get_ip()取得。 + IP of your Robomaster, can be obtained from Commander.get_ip() + :param processing: 回调函数,调用时参数形如 ``processing(cmd=self._cmd, queues=self._queues, logger=self.logger)`` , + 其中cmd为连接到机甲的Commander,queues为输入的队列元组,logger用于日志打印。 + callback function, is called in form ``processing(cmd=self._cmd, queues=self._queues, logger=self.logger)``, + where cmd is a connected Commander, queue is the input tuple of mp.Queue, logger is for logging. + :param timeout: Commander的连接超时。 + timeout for Commander. + :param loop: 是否循环调用回调函数processing + whether calls processing(callback) function in loop, default to True. + """ + super().__init__(name, None, None, (ip, 0), timeout, loop=loop) + self._queues = queues + self._processing = processing + self._cmd = Commander(ip, timeout) + + def close(self): + self._cmd.close() + super().close() + + def work(self) -> None: + self._processing(cmd=self._cmd, queues=self._queues, logger=self.logger) diff --git a/controller/packagesrc/measure.py b/controller/packagesrc/measure.py new file mode 100644 index 0000000..cc70aef --- /dev/null +++ b/controller/packagesrc/measure.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- + + +# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗ +# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ +# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝ +# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝ +# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║ +# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + + +import math +from typing import Tuple + +FOCAL_LENGTH_HD: float = 710 +HORIZONTAL_DEGREES: float = 96 +HORIZONTAL_PIXELS: float = 1280 +VERTICAL_DEGREES: float = 54 +VERTICAL_PIXELS: float = 720 + +INFANTRY_LENGTH: float = 0.32 +INFANTRY_WIDTH: float = 0.24 +INFANTRY_HEIGHT: float = 0.27 + +ENGINEERING_LENGTH: float = 0.41 +ENGINEERING_WIDTH: float = 0.24 +ENGINEERING_HEIGHT: float = 0.33 + + +def pinhole_distance(actual_size: float, pixel_size: float, focal_length: float = FOCAL_LENGTH_HD) -> float: + """ + 使用针孔相机模型估测物体到相机的距离。 + + Estimate distance between camera and object using pinhole camera model. + + :param actual_size: 实际大小,单位米。 Actual object size in meters. + :param pixel_size: 像素大小,单位像素。 Object size in pixels. + :param focal_length: (可选)当前分辨率下的等效焦距,和分辨率相关。默认使用1280*720分辨率下的数值。Perceived focal length at specified image resolution, default to value under 1280*720. + :return: 物体到相机的距离,单位米。 Distance between camera and object, in meters. + """ + return focal_length * actual_size / pixel_size + + +def distance_decomposition(pixel_x: float, distance: float, horizontal_pixels: float = HORIZONTAL_PIXELS, horizontal_degrees: float = HORIZONTAL_DEGREES) -> Tuple[float, float, float]: + """ + 将距离分解为前进分量(前为正)和侧向分量(右为正)。本函数要求线段的两端在相同海拔高度。 + + Decomposition distance into forward vector(forward as positive) and lateral vector(right as positive). + This function requires that both ends of the line segment are at the same altitude. + + :param pixel_x: 物体在图像上的x坐标,单位像素。 the x coordinate of the object on the image, in pixels. + :param distance: 距离,单位米。 Distance in meter. + :param horizontal_pixels: 图像横向的像素数目,默认1280. The number of pixels in the horizontal direction of the image, the default is 1280. + :param horizontal_degrees: 图像横向的视角大小,默认96. The horizontal viewing angle of the image, the default is 96. + :return: 前进分量和侧向分量,单位米;水平偏转角度,单位度。 forward vector and lateral vector in meters; horizontal angle in degrees. + """ + horizontal_degree = horizontal_degrees * (pixel_x / horizontal_pixels - 0.5) + rad = horizontal_degree / 180 * math.pi + lateral = distance * math.sin(rad) + forward = distance * math.cos(rad) + return forward, lateral, horizontal_degree