添加自定义数据控制机器人
Goal: 通过添加自定义的数据类型,在bubble传递数据,控制机器人完成自定义的功能
目录
先决条件
您具备了ROS的基本知识,掌握了如何 创建自定义 msg 和 srv 文件。 并能够了解 在Bubble中如何发送和接收数据。
1 添加自定义消息类型
自定义的消息类型应该被放置于 bubble_interface
模块中,绝大部分需要使用比赛信息和识别器推理结果已在 game_msgs
和 bbox_ex_msgs
中定义
对于自定义的控制信息,应被放置于 rmctrl_msgs
功能包下。
1.1 编写消息文件
在 rmctrl_msgs/mgs
下新建文件:
touch rmctrl_msgs/mgs/CustomDataType.msg
在 CustomDataType.msg
编写内容:
std_msgs/Header header
int8 test_datat_type1
float64 test_datat_type2
bool test_datat_type3
我们在 CustomDataType.msg
中添加了一个 int8
类型的变量 test_datat_type1
、 float64
类型的变量 test_datat_type2
、 bool
类型的变量 test_datat_type3
,该消息类型包含 header
。
1.2 CMakeLists.txt
下面将自定义的消息类型添加到 CMakeLists.txt
:
rosidl_generate_interfaces(${PROJECT_NAME}
...
"msg/CustomDataType.msg"
DEPENDENCIES std_msgs
)
1.3 重新构建 rmctrl_msgs
包
colcon build --packages-select rmctrl_msgs --symlink-install
2 发送自定义消息至MCU
2.1 在BCP core中添加新的消息类型
在BCP分发层中,添加subscriber,修改 bubble_core/bubble_protocol/bubble_protocol/dispatch.py
:
from rmctrl_msgs.msg import CustomDataType
'''Some standard API interface code'''
# Expanded api definition
def init_robot(self, name):
if name == "sentry_up":
# Other robot API define codes
# Suppose AI robot that needs to send a custom message
elif name == "standard":
self.custom_data_sub = self.create_subscription(
CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10)
def ex_custom_data_callback(self, msg: CustomDataType):
custom_data_list = []
custom_data_list.append(msg.test_datat_type1)
custom_data_list.append(msg.test_datat_type2)
custom_data_list.append(msg.test_datat_type3)
self.robot_serial.send_data("custom_data", custom_data_list)
2.2 在协议中添加新的消息类型
在BCP协议层中定义上位机和下位机通讯的数据类型,修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py
:
custom_data_info = OrderedDict()
custom_data_info["test_datat_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
custom_data_info["test_datat_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000]
custom_data_info["test_datat_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
ID = {
"chassis": [0x10, chassis_info],
# Other ID API define codes
# Suppose ID is 85
"custom_data": [0x55, custom_data_info],
}
2.3 解释代码
首先包含我们刚刚定义的 CustomDataType
消息类型
from rmctrl_msgs.msg import CustomDataType
在分发层中,我们创建了一个名为 custom_data_sub
的subscriber,接收来自 /core/custom_data_api
中的数据,数据类型为自定义的 CustomDataType
,接收到数据后,会执行回调函数 ex_custom_data_callback
:
self.custom_data_sub = self.create_subscription(
CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10)
回调函数 ex_custom_data_callback
会将收到的数据存放到一个列表中,BCP core根据数据的定义发送至下位机:
def ex_custom_data_callback(self, msg: CustomDataType):
custom_data_list = []
custom_data_list.append(msg.test_datat_type1)
custom_data_list.append(msg.test_datat_type2)
custom_data_list.append(msg.test_datat_type3)
self.robot_serial.send_data("custom_data", custom_data_list)
之后,我们需要定义发送至MCU的数据格式,我们设置了三个元素分别以 uint8
、 int32
、 uint8
的格式,放缩1、1000、1的倍数发送:
custom_data_info = OrderedDict()
custom_data_info["test_data_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
custom_data_info["test_data_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000]
custom_data_info["test_data_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
Note
使用 OrderedDict()
定义发送数据类型,仅仅是为了保持对python 3.6及更低版本解释器的兼容
最后我们定义 custom_data
的ID为85:
ID = {
# ...
"custom_data": [0x55, custom_data_info],
}
3 接受来自MCU的自定义消息
Attention
在后续的Bubble版本中,可能对机器人状态发布的BCP部分发生小范围的改动
3.1 添加机器人状态定义
在机器人状态中,添加自定义的机器人状态信息。修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py
:
STATUS = {
# Other status define codes
"custom_data": copy.deepcopy(custom_data_info)
}
(可选地)对于实时性要求较高的数据,可以采用 实时API
的方式发送,修改代码:
REALTIME_CALLBACK = {
# Other realtime status define codes
"custom_data": None
}
3.1 发布自定义的机器人状态信息
修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py
:
def status_init(self):
# non-realtime publisher api
self.custom_data_pub = self.node.create_publisher(
CustomDataType, '/status/custom_data', 10)
def non_realtime_status(self):
custom_data_msg = CustomDataType()
custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL])
custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL])
custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL])
self.custom_data_pub.publish(custom_data_msg)
(可选地)对于实时状态信息:
def status_init(self):
def custom_data_callback():
custom_data_msg = CustomDataType()
custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL])
custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL])
custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL])
self.custom_data_pub.publish(custom_data_msg)
# real-time publisher api
self.custom_data_pub = self.node.create_publisher(
CustomDataType, '/status/custom_data', 10)
self.realtime_callback["custom_data"] = custom_data_callback