776 lines
30 KiB
Python
776 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
# ██████╗ ██████╗ ██████╗ ██████╗ ███╗ ███╗ █████╗ ███████╗████████╗███████╗██████╗ ██████╗ ██╗ ██╗
|
||
# ██╔══██╗██╔═══██╗██╔══██╗██╔═══██╗████╗ ████║██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝
|
||
# ██████╔╝██║ ██║██████╔╝██║ ██║██╔████╔██║███████║███████╗ ██║ █████╗ ██████╔╝██████╔╝ ╚████╔╝
|
||
# ██╔══██╗██║ ██║██╔══██╗██║ ██║██║╚██╔╝██║██╔══██║╚════██║ ██║ ██╔══╝ ██╔══██╗██╔═══╝ ╚██╔╝
|
||
# ██║ ██║╚██████╔╝██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ██║███████║ ██║ ███████╗██║ ██║██║ ██║
|
||
# ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝
|
||
|
||
import logging
|
||
import multiprocessing as mp
|
||
import socket
|
||
from typing import Optional
|
||
|
||
from dataclasses import dataclass
|
||
|
||
CTX = mp.get_context('spawn')
|
||
LOG_LEVEL = logging.DEBUG
|
||
|
||
VIDEO_PORT: int = 40921
|
||
AUDIO_PORT: int = 40922
|
||
CTRL_PORT: int = 40923
|
||
PUSH_PORT: int = 40924
|
||
EVENT_PORT: int = 40925
|
||
IP_PORT: int = 40926
|
||
|
||
DEFAULT_BUF_SIZE: int = 512
|
||
|
||
# switch_enum
|
||
SWITCH_ON: str = 'on'
|
||
SWITCH_OFF: str = 'off'
|
||
|
||
# mode_enum
|
||
MODE_CHASSIS_LEAD: str = 'chassis_lead'
|
||
MODE_GIMBAL_LEAD: str = 'gimbal_lead'
|
||
MODE_FREE: str = 'free'
|
||
MODE_ENUMS = (MODE_CHASSIS_LEAD, MODE_GIMBAL_LEAD, MODE_FREE)
|
||
|
||
# armor_event_attr_enum
|
||
ARMOR_HIT: str = 'hit'
|
||
ARMOR_ENUMS = (ARMOR_HIT,)
|
||
|
||
# sound_event_attr_enum
|
||
SOUND_APPLAUSE: str = 'applause'
|
||
SOUND_ENUMS = (SOUND_APPLAUSE,)
|
||
|
||
# led_comp_enum
|
||
LED_ALL = 'all'
|
||
LED_TOP_ALL = 'top_all'
|
||
LED_TOP_RIGHT = 'top_right'
|
||
LED_TOP_LEFT = 'top_left'
|
||
LED_BOTTOM_ALL = 'bottom_all'
|
||
LED_BOTTOM_FRONT = 'bottom_front'
|
||
LED_BOTTOM_BACK = 'bottom_back'
|
||
LED_BOTTOM_LEFT = 'bottom_left'
|
||
LED_BOTTOM_RIGHT = 'bottom_right'
|
||
LED_ENUMS = (LED_ALL, LED_TOP_ALL, LED_TOP_RIGHT, LED_TOP_LEFT,
|
||
LED_BOTTOM_ALL, LED_BOTTOM_FRONT, LED_BOTTOM_BACK,
|
||
LED_BOTTOM_LEFT, LED_BOTTOM_RIGHT)
|
||
|
||
# led_effect_enum
|
||
LED_EFFECT_SOLID = 'solid'
|
||
LED_EFFECT_OFF = 'off'
|
||
LED_EFFECT_PULSE = 'pulse'
|
||
LED_EFFECT_BLINK = 'blink'
|
||
LED_EFFECT_SCROLLING = 'scrolling'
|
||
LED_EFFECT_ENUMS = (LED_EFFECT_SOLID, LED_EFFECT_OFF,
|
||
LED_EFFECT_PULSE, LED_EFFECT_BLINK,
|
||
LED_EFFECT_SCROLLING)
|
||
|
||
|
||
@dataclass
|
||
class ChassisSpeed:
|
||
x: float
|
||
y: float
|
||
z: float
|
||
w1: int
|
||
w2: int
|
||
w3: int
|
||
w4: int
|
||
|
||
|
||
@dataclass
|
||
class ChassisPosition:
|
||
x: float
|
||
y: float
|
||
z: Optional[float]
|
||
|
||
|
||
@dataclass
|
||
class ChassisAttitude:
|
||
pitch: float
|
||
roll: float
|
||
yaw: float
|
||
|
||
|
||
@dataclass
|
||
class ChassisStatus:
|
||
# 是否静止
|
||
static: bool
|
||
# 是否上坡
|
||
uphill: bool
|
||
# 是否下坡
|
||
downhill: bool
|
||
# 是否溜坡
|
||
on_slope: bool
|
||
# 是否被拿起
|
||
pick_up: bool
|
||
# 是否滑行
|
||
slip: bool
|
||
# x轴是否感应到撞击
|
||
impact_x: bool
|
||
# y轴是否感应到撞击
|
||
impact_y: bool
|
||
# z轴是否感应到撞击
|
||
impact_z: bool
|
||
# 是否翻车
|
||
roll_over: bool
|
||
# 是否在坡上静止
|
||
hill_static: bool
|
||
|
||
|
||
@dataclass
|
||
class GimbalAttitude:
|
||
pitch: float
|
||
yaw: float
|
||
|
||
|
||
@dataclass
|
||
class ArmorHitEvent:
|
||
index: int
|
||
type: int
|
||
|
||
|
||
@dataclass
|
||
class SoundApplauseEvent:
|
||
count: int
|
||
|
||
|
||
def get_broadcast_ip(timeout: float = None) -> str:
|
||
"""
|
||
接收广播以获取机甲IP
|
||
|
||
Receive broadcasting IP of Robomaster.
|
||
|
||
:param timeout: 等待超时(秒)。 timeout in second
|
||
:return: 机甲IP地址。IP of Robomaster.
|
||
"""
|
||
BROADCAST_INITIAL: str = 'robot ip '
|
||
|
||
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
conn.bind(('', IP_PORT))
|
||
conn.settimeout(timeout)
|
||
msg, ip, port = None, None, None
|
||
try:
|
||
msg, (ip, port) = conn.recvfrom(DEFAULT_BUF_SIZE)
|
||
finally:
|
||
conn.close()
|
||
msg = msg.decode()
|
||
assert len(msg) > len(BROADCAST_INITIAL), f'broken msg from {ip}:{port}: {msg}'
|
||
msg = msg[len(BROADCAST_INITIAL):]
|
||
assert msg == ip, f'unmatched source({ip}) and reported IP({msg})'
|
||
return msg
|
||
|
||
|
||
class Commander:
|
||
def __init__(self, ip: str = '', timeout: float = 30):
|
||
"""
|
||
创建SDK实例并连接机甲,实例在创建后立即可用。
|
||
|
||
Create a new SDK instance and connect it to Robomaster.
|
||
Instance is available immediately after creation.
|
||
|
||
:param ip: 可选,机甲IP,可在路由器模式下自动获取。 (Optional) IP of Robomaster, which can be detected automatically under router mode.
|
||
:param timeout: 可选,TCP通讯超时(秒)。 (Optional) TCP timeout in second.
|
||
"""
|
||
self._mu: mp.Lock = CTX.Lock()
|
||
with self._mu:
|
||
if ip == '':
|
||
ip = get_broadcast_ip(timeout)
|
||
self._ip: str = ip
|
||
self._closed: bool = False
|
||
self._conn: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self._timeout: float = timeout
|
||
self._conn.settimeout(self._timeout)
|
||
self._conn.connect((self._ip, CTRL_PORT))
|
||
resp = self._do('command')
|
||
assert self._is_ok(resp) or resp == 'Already in SDK mode', f'entering SDK mode: {resp}'
|
||
|
||
def close(self):
|
||
"""
|
||
关闭实例,回收socket资源。注意这个命令并不会发送quit到机甲,避免打扰其他在线的Commander.
|
||
|
||
Close instance, deallocate system socket resource.
|
||
Note this method will NOT send quit command to Robomaster,
|
||
because there may be other Commander still active.
|
||
"""
|
||
with self._mu:
|
||
self._conn.close()
|
||
self._closed = True
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self):
|
||
self.close()
|
||
|
||
@staticmethod
|
||
def _is_ok(resp: str) -> bool:
|
||
return resp == 'ok'
|
||
|
||
def _do(self, *args) -> str:
|
||
assert len(args) > 0, 'empty arg not accepted'
|
||
assert not self._closed, 'connection is already closed'
|
||
cmd = ' '.join(map(str, args)) + ';'
|
||
self._conn.send(cmd.encode())
|
||
buf = self._conn.recv(DEFAULT_BUF_SIZE)
|
||
# 返回值后面有时候会多一个迷之空格,
|
||
# 为了可能的向后兼容,额外剔除终止符。
|
||
return buf.decode().strip(' ;')
|
||
|
||
def arm_move(self, x: float, y: float) -> str:
|
||
return self._do(f'robotic_arm move x {x} y {y}')
|
||
|
||
def arm_moveto(self, x: float, y: float) -> str:
|
||
return self._do(f'robotic_arm moveto x {x} y {y}')
|
||
|
||
def arm_recenter(self) -> str:
|
||
return self._do('robotic_arm recenter')
|
||
|
||
def arm_pos(self) -> int:
|
||
"""
|
||
x_pos: The x-axis coordinate in centimeters
|
||
y_pos: The y-axis coordinate in centimeters
|
||
"""
|
||
return self._do('robotic_arm position ?')
|
||
|
||
def arm_stop(self) -> str:
|
||
return self._do('robotic_arm stop')
|
||
|
||
def gripper_open(self, force: int = None) -> str:
|
||
assert force in [None, 1, 2, 3,4 ], "Force should be one of None, 1, 2, 3, 4"
|
||
return self._do(f'robotic_gripper open{f' leve {force}' if force else ''}')
|
||
|
||
def gripper_close(self, force: int = None) -> str:
|
||
assert force in [None, 1, 2, 3,4 ], "Force should be one of None, 1, 2, 3, 4"
|
||
return self._do(f'robotic_gripper close{f' leve {force}' if force else ''}')
|
||
|
||
def gripper_status(self) -> int:
|
||
"""
|
||
0: The mechanical gripper is fully closed.
|
||
1: The mechanical gripper is neither fully closed nor fully opened.
|
||
2: The mechanical gripper is fully opened.
|
||
"""
|
||
return self._do('robotic_gripper status ?')
|
||
|
||
def get_ip(self) -> str:
|
||
"""
|
||
返回机甲IP。
|
||
|
||
get IP that the commander currently connects to.
|
||
|
||
:return: 机甲IP。 IP that the commander currently connects to
|
||
"""
|
||
assert not self._closed, 'connection is already closed'
|
||
return self._ip
|
||
|
||
def do(self, *args) -> str:
|
||
"""
|
||
执行任意命令。
|
||
|
||
Execute any command.
|
||
|
||
:param args: 命令内容。 command content.
|
||
:return: 命令返回。 the response of the command.
|
||
"""
|
||
with self._mu:
|
||
return self._do(*args)
|
||
|
||
def version(self) -> str:
|
||
"""
|
||
查询当前机甲的SDK版本。
|
||
|
||
query robomaster SDK version
|
||
|
||
:return: SDK版本号。 SDK version string.
|
||
"""
|
||
return self.do('version')
|
||
|
||
def robot_mode(self, mode: str) -> str:
|
||
"""
|
||
更改机甲的运动模式。
|
||
|
||
Update Robomaster's movement mode.
|
||
|
||
:param mode: 三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_*
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert mode in MODE_ENUMS, f'unknown mode {mode}'
|
||
resp = self.do('robot', 'mode', mode)
|
||
assert self._is_ok(resp), f'robot_mode: {resp}'
|
||
return resp
|
||
|
||
def get_robot_mode(self) -> str:
|
||
"""
|
||
查询当前机甲的运动模式。
|
||
|
||
Query for Robomaster's current movement mode.
|
||
|
||
:return: 三种模式之一,见enum MODE_*。 Movement mode, refer to enum MODE_*
|
||
"""
|
||
resp = self.do('robot', 'mode', '?')
|
||
assert resp in MODE_ENUMS, f'unexpected robot mode result: {resp}'
|
||
return resp
|
||
|
||
def chassis_speed(self, x: float = 0, y: float = 0, z: float = 0) -> str:
|
||
"""
|
||
更改底盘运动速度。
|
||
|
||
Update chassis speed.
|
||
|
||
:param x: x 轴向运动速度,单位 m/s speed in x axis, in m/s
|
||
:param y: y 轴向运动速度,单位 m/s speed in y axis, in m/s
|
||
:param z: z 轴向旋转速度,单位 °/s rotation speed in z axis, in °/s
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert -3.5 <= x <= 3.5, f'x {x} is out of range'
|
||
assert -3.5 <= y <= 3.5, f'y {y} is out of range'
|
||
assert -600 <= z <= 600, f'z {z} is out of range'
|
||
resp = self.do('chassis', 'speed', 'x', x, 'y', y, 'z', z)
|
||
assert self._is_ok(resp), f'chassis_speed: {resp}'
|
||
return resp
|
||
|
||
def get_chassis_speed(self) -> ChassisSpeed:
|
||
"""
|
||
获取底盘速度。
|
||
|
||
Query for chassis speed.
|
||
|
||
:return: x 轴向运动速度(m/s),y 轴向运动速度(m/s),z 轴向旋转速度(°/s),w1 右前麦轮速度(rpm),w2 左前麦轮速速(rpm),w3 右后麦轮速度(rpm),w4 左后麦轮速度(rpm)。
|
||
speed in x axis(m/s), speed in y axis(m/s), rotation speed in z axis(°/s), w1(right front) wheel speed(rpm), w2(left front) wheel speed(rpm), w3(right back) wheel speed(rpm), w4(left back) wheel speed(rpm)
|
||
"""
|
||
resp = self.do('chassis', 'speed', '?')
|
||
ans = resp.split(' ')
|
||
assert len(ans) == 7, f'get_chassis_speed: {resp}'
|
||
return ChassisSpeed(x=float(ans[0]), y=float(ans[1]), z=float(ans[2]), w1=int(ans[3]), w2=int(ans[4]), w3=int(ans[5]), w4=int(ans[6]))
|
||
|
||
def chassis_wheel(self, w1: int = 0, w2: int = 0, w3: int = 0, w4: int = 0) -> str:
|
||
"""
|
||
更改底盘轮子速度。
|
||
|
||
Update chassis wheel rotation speed.
|
||
|
||
:param w1: 右前麦轮速度,单位 rpm w1(right front) wheel speed(rpm)
|
||
:param w2: 左前麦轮速度,单位 rpm w2(left front) wheel speed(rpm)
|
||
:param w3: 右后麦轮速度,单位 rpm w3(right back) wheel speed(rpm)
|
||
:param w4: 左后麦轮速度,单位 rpm w4(left back) wheel speed(rpm)
|
||
:return ok: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
for i, v in enumerate((w1, w2, w3, w4)):
|
||
assert -1000 <= v <= 1000, f'w{i + 1} {v} is out of range'
|
||
resp = self.do('chassis', 'wheel', 'w1', w1, 'w2', w2, 'w3', w3, 'w4', w4)
|
||
assert self._is_ok(resp), f'chassis_wheel: {resp}'
|
||
return resp
|
||
|
||
def chassis_move(self, x: float = 0, y: float = 0, z: float = 0, speed_xy: float = None, speed_z: float = None) -> str:
|
||
"""
|
||
控制底盘运动当指定位置,坐标轴原点为当前位置。
|
||
|
||
Make chassis move to specified location. The origin is current location.
|
||
|
||
:param x: x 轴向运动距离,单位 m movement in x axis, in meter
|
||
:param y: y 轴向运动距离,单位 m movement in y axis, in meter
|
||
:param z: z 轴向旋转角度,单位 ° movement in z axis, in degree
|
||
:param speed_xy: xy 轴向运动速度,单位 m/s speed in both x and y axis, in meter/second
|
||
:param speed_z: z 轴向旋转速度, 单位 °/s speed in z axis, in degree/second
|
||
:return ok: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert -5 <= x <= 5, f'x {x} is out of range'
|
||
assert -5 <= y <= 5, f'y {y} is out of range'
|
||
assert -1800 <= z <= 1800, f'z {z} is out of range'
|
||
assert speed_xy is None or 0 < speed_xy <= 3.5, f'speed_xy {speed_xy} is out of range'
|
||
assert speed_z is None or 0 < speed_z <= 600, f'speed_z {speed_z} is out of range'
|
||
cmd = ['chassis', 'move', 'x', x, 'y', y, 'z', z]
|
||
if speed_xy is not None:
|
||
cmd += ['vxy', speed_xy]
|
||
if speed_z is not None:
|
||
cmd += ['vz', speed_z]
|
||
resp = self.do(*cmd)
|
||
assert self._is_ok(resp), f'chassis_move: {resp}'
|
||
return resp
|
||
|
||
def get_chassis_position(self) -> ChassisPosition:
|
||
"""
|
||
获取底盘当前的位置(以上电时刻位置为原点)。
|
||
|
||
Query for chassis location. The origin is where Robomaster powers on.
|
||
|
||
:return: x 轴位置(m),y 轴位置(m),偏航角度(°)。 location consisting of x, y, z, in meter, meter, degree.
|
||
"""
|
||
resp = self.do('chassis', 'position', '?')
|
||
ans = resp.split(' ')
|
||
assert len(ans) == 3, f'get_chassis_position: {resp}'
|
||
return ChassisPosition(float(ans[0]), float(ans[1]), float(ans[2]))
|
||
|
||
def get_chassis_attitude(self) -> ChassisAttitude:
|
||
"""
|
||
获取底盘姿态信息。
|
||
|
||
Query for chassis attitude.
|
||
|
||
:return: pitch 轴角度(°),roll 轴角度(°),yaw 轴角度(°)。 pitch, roll, yaw in degree.
|
||
"""
|
||
resp = self.do('chassis', 'attitude', '?')
|
||
ans = resp.split(' ')
|
||
assert len(ans) == 3, f'get_chassis_attitude: {resp}'
|
||
return ChassisAttitude(float(ans[0]), float(ans[1]), float(ans[2]))
|
||
|
||
def get_chassis_status(self) -> ChassisStatus:
|
||
"""
|
||
获取底盘状态信息。
|
||
|
||
Query for chassis status.
|
||
|
||
:return: 底盘状态,详见 ChassisStatus chassis status, see class ChassisStatus.
|
||
"""
|
||
resp = self.do('chassis', 'status', '?')
|
||
ans = resp.split(' ')
|
||
assert len(ans) == 11, f'get_chassis_status: {resp}'
|
||
return ChassisStatus(*map(lambda x: bool(int(x)), ans))
|
||
|
||
def chassis_push_on(self, position_freq: int = None, attitude_freq: int = None, status_freq: int = None, all_freq: int = None) -> str:
|
||
"""
|
||
打开底盘中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50.
|
||
|
||
Enable chassis push of specified attribution. Supported frequencies are 1, 5, 10, 20, 30, 50.
|
||
|
||
:param position_freq: 位置推送频率,不设定则设为None. position push frequency, None for no-op.
|
||
:param attitude_freq: 姿态推送频率,不设定则设为None. attitude push frequency, None for no-op.
|
||
:param status_freq: 状态推送频率,不设定则设为None. status push frequency, None for no-op.
|
||
:param all_freq: 统一设置所有推送频率,设置则开启所有推送。 update all push frequency, this affects all attribution.
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
valid_frequencies = (1, 5, 10, 20, 30, 50)
|
||
cmd = ['chassis', 'push']
|
||
if all_freq is not None:
|
||
assert all_freq in valid_frequencies, f'all_freq {all_freq} is not valid'
|
||
cmd += ['freq', all_freq]
|
||
else:
|
||
if position_freq is not None:
|
||
assert position_freq in valid_frequencies, f'position_freq {position_freq} is not valid'
|
||
cmd += ['position', SWITCH_ON, 'pfreq', position_freq]
|
||
if attitude_freq is not None:
|
||
assert attitude_freq in valid_frequencies, f'attitude_freq {attitude_freq} is not valid'
|
||
cmd += ['attitude', SWITCH_ON, 'afreq', attitude_freq]
|
||
if status_freq is not None:
|
||
assert status_freq in valid_frequencies, f'status_freq {status_freq} is not valid'
|
||
cmd += ['status', SWITCH_ON, 'sfreq', status_freq]
|
||
assert len(cmd) > 2, 'at least one argument should not be None'
|
||
resp = self.do(*cmd)
|
||
assert self._is_ok(resp), f'chassis_push_on: {resp}'
|
||
return resp
|
||
|
||
def chassis_push_off(self, position: bool = False, attitude: bool = False, status: bool = False, all: bool = False) -> str:
|
||
"""
|
||
关闭底盘中相应属性的信息推送。
|
||
|
||
Disable chassis push of specified attribution.
|
||
|
||
:param position: 是否关闭位置推送。 whether disable position push.
|
||
:param attitude: 是否关闭姿态推送。 whether disable attitude push.
|
||
:param status: 是否关闭状态推送。 whether disable status push.
|
||
:param all: 关闭所有推送。 whether disable all pushes.
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
cmd = ['chassis', 'push']
|
||
if all or position:
|
||
cmd += ['position', SWITCH_OFF]
|
||
if all or attitude:
|
||
cmd += ['attitude', SWITCH_OFF]
|
||
if all or status:
|
||
cmd += ['status', SWITCH_OFF]
|
||
|
||
assert len(cmd) > 2, 'at least one argument should be True'
|
||
resp = self.do(*cmd)
|
||
assert self._is_ok(resp), f'chassis_push_off: {resp}'
|
||
return resp
|
||
|
||
def gimbal_speed(self, pitch: float, yaw: float) -> str:
|
||
"""
|
||
控制云台运动速度。
|
||
|
||
Update gimbal speed.
|
||
|
||
:param pitch: pitch 轴速度,单位 °/s Pitch speed in °/s
|
||
:param yaw: yaw 轴速度,单位 °/s yaw speed in °/s
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert -450 <= pitch <= 450, f'pitch {pitch} is out of range'
|
||
assert -450 <= yaw <= 450, f'yaw {yaw} is out of range'
|
||
resp = self.do('gimbal', 'speed', 'p', pitch, 'y', yaw)
|
||
assert self._is_ok(resp), f'gimbal_speed: {resp}'
|
||
return resp
|
||
|
||
def gimbal_move(self, pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) -> str:
|
||
"""
|
||
控制云台运动到指定位置,坐标轴原点为当前位置。
|
||
|
||
Make gimbal move to specified location. The origin is current location.
|
||
|
||
:param pitch: pitch 轴角度, 单位 ° pitch delta in degree
|
||
:param yaw: yaw 轴角度, 单位 ° yaw delta in degree
|
||
:param pitch_speed: pitch 轴运动速速,单位 °/s pitch speed in °/s
|
||
:param yaw_speed: yaw 轴运动速速,单位 °/s yaw speed in °/s
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert -55 <= pitch <= 55, f'pitch {pitch} is out of range'
|
||
assert -55 <= yaw <= 55, f'yaw {yaw} is out of range'
|
||
assert pitch_speed is None or 0 < pitch_speed <= 540, f'pitch_speed {pitch_speed} is out of range'
|
||
assert yaw_speed is None or 0 < yaw_speed <= 540, f'yaw_speed {yaw_speed} is out of range'
|
||
cmd = ['gimbal', 'move', 'p', pitch, 'y', yaw]
|
||
if pitch_speed is not None:
|
||
cmd += ['vp', pitch_speed]
|
||
if yaw_speed is not None:
|
||
cmd += ['vy', yaw_speed]
|
||
resp = self.do(*cmd)
|
||
assert self._is_ok(resp), f'gimbal_move: {resp}'
|
||
return resp
|
||
|
||
def gimbal_moveto(self, pitch: float = 0, yaw: float = 0, pitch_speed: float = None, yaw_speed: float = None) -> str:
|
||
"""
|
||
控制云台运动到指定位置,坐标轴原点为上电位置。
|
||
|
||
Make gimbal move to specified location. The origin is gimbal center.
|
||
|
||
:param pitch: pitch 轴角度, 单位 ° pitch delta in degree
|
||
:param yaw: yaw 轴角度, 单位 ° yaw delta in degree
|
||
:param pitch_speed: pitch 轴运动速速,单位 °/s pitch speed in °/s
|
||
:param yaw_speed: yaw 轴运动速速,单位 °/s yaw speed in °/s
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert -25 <= pitch <= 30, f'pitch {pitch} is out of range'
|
||
assert -250 <= yaw <= 250, f'yaw {yaw} is out of range'
|
||
assert pitch_speed is None or 0 < pitch_speed <= 540, f'pitch_speed {pitch_speed} is out of range'
|
||
assert yaw_speed is None or 0 < yaw_speed <= 540, f'yaw_speed {yaw_speed} is out of range'
|
||
cmd = ['gimbal', 'moveto', 'p', pitch, 'y', yaw]
|
||
if pitch_speed is not None:
|
||
cmd += ['vp', pitch_speed]
|
||
if yaw_speed is not None:
|
||
cmd += ['vy', yaw_speed]
|
||
resp = self.do(*cmd)
|
||
assert self._is_ok(resp), f'gimbal_moveto: {resp}'
|
||
return resp
|
||
|
||
def gimbal_suspend(self):
|
||
"""
|
||
使云台进入休眠状态。
|
||
|
||
Suspend(sleep) the gimbal.
|
||
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('gimbal', 'suspend')
|
||
assert self._is_ok(resp), f'gimbal_suspend: {resp}'
|
||
return resp
|
||
|
||
def gimbal_resume(self):
|
||
"""
|
||
控制云台从休眠状态中恢复
|
||
|
||
awake the gimbal if it is suspended(sleeping).
|
||
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('gimbal', 'resume')
|
||
assert self._is_ok(resp), f'gimbal_resume: {resp}'
|
||
return resp
|
||
|
||
def gimbal_recenter(self):
|
||
"""
|
||
控制云台回中。
|
||
|
||
Recenter the gimbal.
|
||
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('gimbal', 'recenter')
|
||
assert self._is_ok(resp), f'gimbal_recenter: {resp}'
|
||
return resp
|
||
|
||
def get_gimbal_attitude(self) -> GimbalAttitude:
|
||
"""
|
||
获取云台姿态信息。
|
||
|
||
Query for gimbal attitude.
|
||
|
||
:return: pitch 轴角度(°),yaw 轴角度(°) pitch, yaw in degree
|
||
"""
|
||
resp = self.do('gimbal', 'attitude', '?')
|
||
ans = resp.split(' ')
|
||
assert len(ans) == 2, f'get_gimbal_attitude: {resp}'
|
||
return GimbalAttitude(pitch=float(ans[0]), yaw=float(ans[1]))
|
||
|
||
def gimbal_push_on(self, attitude_freq: int = 5) -> str:
|
||
"""
|
||
打开云台中相应属性的信息推送,支持的频率 1, 5, 10, 20, 30, 50.
|
||
|
||
Enable gimbal attribution push. Supported frequencies are 1, 5, 10, 20, 30, 50.
|
||
|
||
:param attitude_freq: 姿态推送频率. attitude push frequency.
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
valid_frequencies = (1, 5, 10, 20, 30, 50)
|
||
assert attitude_freq in valid_frequencies, f'invalid attitude_freq {attitude_freq}'
|
||
resp = self.do('gimbal', 'push', 'attitude', SWITCH_ON, 'afreq', attitude_freq)
|
||
assert self._is_ok(resp), f'gimbal_push_on: {resp}'
|
||
return resp
|
||
|
||
def gimbal_push_off(self, attitude: bool = True) -> str:
|
||
"""
|
||
关闭云台中相应属性的信息推送。
|
||
|
||
Disable gimbal push of specified attribution.
|
||
|
||
:param attitude: 关闭姿态推送。 whether disable attitude push.
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert attitude, 'at least one augment should be True'
|
||
resp = self.do('gimbal', 'push', 'attitude', SWITCH_OFF)
|
||
assert self._is_ok(resp), f'gimbal_push_off: {resp}'
|
||
return resp
|
||
|
||
def armor_sensitivity(self, value: int) -> str:
|
||
"""
|
||
设置装甲板打击检测灵敏度。
|
||
|
||
Update armor sensitivity.
|
||
|
||
:param value: 装甲板灵敏度,数值越大,越容易检测到打击。默认灵敏度值为 5.
|
||
armor sensitivity, the bigger, the more sensitive. Default to 5.
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert 1 <= value <= 10, f'value {value} is out of range'
|
||
resp = self.do('armor', 'sensitivity', value)
|
||
assert self._is_ok(resp), f'armor_sensitivity: {resp}'
|
||
return resp
|
||
|
||
def get_armor_sensitivity(self) -> int:
|
||
"""
|
||
获取装甲板打击检测灵敏度。
|
||
|
||
Query for armor sensitivity.
|
||
|
||
:return: 装甲板灵敏度 armor sensitivity.
|
||
"""
|
||
resp = self.do('armor', 'sensitivity', '?')
|
||
return int(resp)
|
||
|
||
def armor_event(self, attr: str, switch: bool) -> str:
|
||
"""
|
||
控制装甲板检测事件上报。
|
||
|
||
Enable or disable specified armor event.
|
||
|
||
:param attr: 事件属性名称,范围见 ARMOR_ENUMS. armor event name, see ARMOR_ENUMS.
|
||
:param switch: 是否开启上报 on/off
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert attr in ARMOR_ENUMS, f'unexpected armor event attr {attr}'
|
||
resp = self.do('armor', 'event', attr, SWITCH_ON if switch else SWITCH_OFF)
|
||
assert self._is_ok(resp), f'armor_event: {resp}'
|
||
return resp
|
||
|
||
def sound_event(self, attr: str, switch: bool) -> str:
|
||
"""
|
||
控制声音识别事件上报。
|
||
|
||
Enable or disable specified sound event.
|
||
|
||
:param attr: 事件属性名称,范围见 SOUND_ENUMS. sound event name, see ARMOR_ENUMS.
|
||
:param switch: 是否开启上报 on/off
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert attr in SOUND_ENUMS, f'unexpected armor event attr {attr}'
|
||
resp = self.do('sound', 'event', attr, SWITCH_ON if switch else SWITCH_OFF)
|
||
assert self._is_ok(resp), f'armor_event: {resp}'
|
||
return resp
|
||
|
||
def led_control(self, comp: str, effect: str, r: int, g: int, b: int) -> str:
|
||
"""
|
||
控制 LED 灯效。跑马灯效果仅可作用于云台两侧 LED。
|
||
|
||
Update LED effects. Note scrolling effect works only on gimbal LEDs.
|
||
|
||
:param comp: LED 编号,见 LED_ENUMS LED composition, see LED_ENUMS
|
||
:param effect: 灯效类型,见 LED_EFFECT_ENUMS effect type, see LED_EFFECT_ENUMS
|
||
:param r: RGB 红色分量值 RGB red value
|
||
:param g: RGB 绿色分量值 RGB green value
|
||
:param b: RGB 蓝色分量值 RGB blue value
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
assert comp in LED_ENUMS, f'unknown comp {comp}'
|
||
assert effect in LED_EFFECT_ENUMS, f'unknown effect {effect}'
|
||
assert 0 <= r <= 255, f'r {r} is out of scope'
|
||
assert 0 <= g <= 255, f'g {g} is out of scope'
|
||
assert 0 <= b <= 255, f'b {b} is out of scope'
|
||
if effect == LED_EFFECT_SCROLLING:
|
||
assert comp in (LED_TOP_ALL, LED_TOP_LEFT, LED_TOP_RIGHT), 'scrolling effect works only on gimbal LEDs'
|
||
resp = self.do('led', 'control', 'comp', comp, 'r', r, 'g', g, 'b', b, 'effect', effect)
|
||
assert self._is_ok(resp), f'led_control: {resp}'
|
||
return resp
|
||
|
||
def ir_sensor_measure(self, switch: bool = True):
|
||
"""
|
||
打开/关闭所有红外传感器开关。
|
||
|
||
Enable or disable all IR sensor.
|
||
|
||
:param switch: 打开/关闭 on/off
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('ir_distance_sensor', 'measure', SWITCH_ON if switch else SWITCH_OFF)
|
||
assert self._is_ok(resp), f'ir_sensor_measure: {resp}'
|
||
return resp
|
||
|
||
def get_ir_sensor_distance(self, id: int) -> float:
|
||
"""
|
||
获取指定 ID 的红外深度传感器距离。
|
||
|
||
Query for distance reported by specified IR sensor.
|
||
|
||
:param id: 红外传感器的 ID ID of IR sensor
|
||
:return: 指定 ID 的红外传感器测得的距离值,单位 mm distance in mm
|
||
"""
|
||
assert 1 <= id <= 4, f'invalid IR sensor id {id}'
|
||
resp = self.do('ir_distance_sensor', 'distance', id, '?')
|
||
return float(resp)
|
||
|
||
def stream(self, switch: bool) -> str:
|
||
"""
|
||
视频流开关控制。
|
||
|
||
Enable or disable video stream.
|
||
|
||
:param switch: 打开/关闭 on/off
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('stream', SWITCH_ON if switch else SWITCH_OFF)
|
||
assert self._is_ok(resp), f'stream: {resp}'
|
||
return resp
|
||
|
||
def audio(self, switch: bool) -> str:
|
||
"""
|
||
音频流开关控制。
|
||
|
||
Enable or disable audio stream.
|
||
|
||
:param switch: 打开/关闭 on/off
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('audio', SWITCH_ON if switch else SWITCH_OFF)
|
||
assert self._is_ok(resp), f'audio: {resp}'
|
||
return resp
|
||
|
||
def blaster_fire(self) -> str:
|
||
"""
|
||
控制水弹枪发射一次。
|
||
|
||
Fire once.
|
||
|
||
:return: ok,否则raise。 ok, or raise certain exception.
|
||
"""
|
||
resp = self.do('blaster', 'fire')
|
||
assert self._is_ok(resp), f'blaster_fire: {resp}'
|
||
return resp
|