添加自定义数据控制机器人 =========================== **Goal:** 通过添加自定义的数据类型,在bubble传递数据,控制机器人完成自定义的功能 .. contents:: 目录 :depth: 2 :local: 先决条件 ------------------------------ 您具备了ROS的基本知识,掌握了如何 `创建自定义 msg 和 srv 文件 `__。 并能够了解 `在Bubble中如何发送和接收数据 <控制机器人并接收机器人数据.html>`__。 1 添加自定义消息类型 ------------------------------ 自定义的消息类型应该被放置于 ``bubble_interface`` 模块中,绝大部分需要使用比赛信息和识别器推理结果已在 ``game_msgs`` 和 ``bbox_ex_msgs`` 中定义 对于自定义的控制信息,应被放置于 ``rmctrl_msgs`` 功能包下。 1.1 编写消息文件 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 在 ``rmctrl_msgs/mgs`` 下新建文件: .. code-block:: console touch rmctrl_msgs/mgs/CustomDataType.msg 在 ``CustomDataType.msg`` 编写内容: .. code-block:: yaml std_msgs/Header header int8 test_datat_type1 float64 test_datat_type2 bool test_datat_type3 我们在 ``CustomDataType.msg`` 中添加了一个 ``int8`` 类型的变量 ``test_datat_type1`` 、 ``float64`` 类型的变量 ``test_datat_type2`` 、 ``bool`` 类型的变量 ``test_datat_type3`` ,该消息类型包含 ``header`` 。 1.2 ``CMakeLists.txt`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 下面将自定义的消息类型添加到 ``CMakeLists.txt`` : .. code-block:: console rosidl_generate_interfaces(${PROJECT_NAME} ... "msg/CustomDataType.msg" DEPENDENCIES std_msgs ) 1.3 重新构建 ``rmctrl_msgs`` 包 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: console colcon build --packages-select rmctrl_msgs --symlink-install 2 发送自定义消息至MCU ------------------------------ 2.1 在BCP core中添加新的消息类型 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 在BCP分发层中,添加subscriber,修改 ``bubble_core/bubble_protocol/bubble_protocol/dispatch.py`` : .. code-block:: python from rmctrl_msgs.msg import CustomDataType '''Some standard API interface code''' # Expanded api definition def init_robot(self, name): if name == "sentry_up": # Other robot API define codes # Suppose AI robot that needs to send a custom message elif name == "standard": self.custom_data_sub = self.create_subscription( CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10) def ex_custom_data_callback(self, msg: CustomDataType): custom_data_list = [] custom_data_list.append(msg.test_datat_type1) custom_data_list.append(msg.test_datat_type2) custom_data_list.append(msg.test_datat_type3) self.robot_serial.send_data("custom_data", custom_data_list) 2.2 在协议中添加新的消息类型 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 在BCP协议层中定义上位机和下位机通讯的数据类型,修改 ``bubble_core/bubble_protocol/bubble_protocol/protocol.py`` : .. code-block:: python custom_data_info = OrderedDict() custom_data_info["test_datat_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1] custom_data_info["test_datat_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000] custom_data_info["test_datat_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1] ID = { "chassis": [0x10, chassis_info], # Other ID API define codes # Suppose ID is 85 "custom_data": [0x55, custom_data_info], } 2.3 解释代码 ^^^^^^^^^^^^^^^^^^^^^ 首先包含我们刚刚定义的 ``CustomDataType`` 消息类型 .. code-block:: python from rmctrl_msgs.msg import CustomDataType 在分发层中,我们创建了一个名为 ``custom_data_sub`` 的subscriber,接收来自 ``/core/custom_data_api`` 中的数据,数据类型为自定义的 ``CustomDataType``,接收到数据后,会执行回调函数 ``ex_custom_data_callback`` : .. code-block:: python self.custom_data_sub = self.create_subscription( CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10) 回调函数 ``ex_custom_data_callback`` 会将收到的数据存放到一个列表中,BCP core根据数据的定义发送至下位机: .. code-block:: python def ex_custom_data_callback(self, msg: CustomDataType): custom_data_list = [] custom_data_list.append(msg.test_datat_type1) custom_data_list.append(msg.test_datat_type2) custom_data_list.append(msg.test_datat_type3) self.robot_serial.send_data("custom_data", custom_data_list) 之后,我们需要定义发送至MCU的数据格式,我们设置了三个元素分别以 ``uint8`` 、 ``int32`` 、 ``uint8`` 的格式,放缩1、1000、1的倍数发送: .. code-block:: python custom_data_info = OrderedDict() custom_data_info["test_data_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1] custom_data_info["test_data_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000] custom_data_info["test_data_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1] .. note:: 使用 ``OrderedDict()`` 定义发送数据类型,仅仅是为了保持对python 3.6及更低版本解释器的兼容 最后我们定义 ``custom_data`` 的ID为85: .. code-block:: python ID = { # ... "custom_data": [0x55, custom_data_info], } 3 接受来自MCU的自定义消息 ---------------------------- .. attention:: 在后续的Bubble版本中,可能对机器人状态发布的BCP部分发生小范围的改动 3.1 添加机器人状态定义 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 在机器人状态中,添加自定义的机器人状态信息。修改 ``bubble_core/bubble_protocol/bubble_protocol/protocol.py`` : .. code-block:: python STATUS = { # Other status define codes "custom_data": copy.deepcopy(custom_data_info) } (可选地)对于实时性要求较高的数据,可以采用 ``实时API`` 的方式发送,修改代码: .. code-block:: python REALTIME_CALLBACK = { # Other realtime status define codes "custom_data": None } 3.1 发布自定义的机器人状态信息 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 修改 ``bubble_core/bubble_protocol/bubble_protocol/protocol.py`` : .. code-block:: python def status_init(self): # non-realtime publisher api self.custom_data_pub = self.node.create_publisher( CustomDataType, '/status/custom_data', 10) def non_realtime_status(self): custom_data_msg = CustomDataType() custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL]) custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL]) custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL]) self.custom_data_pub.publish(custom_data_msg) (可选地)对于实时状态信息: .. code-block:: python def status_init(self): def custom_data_callback(): custom_data_msg = CustomDataType() custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL]) custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL]) custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL]) self.custom_data_pub.publish(custom_data_msg) # real-time publisher api self.custom_data_pub = self.node.create_publisher( CustomDataType, '/status/custom_data', 10) self.realtime_callback["custom_data"] = custom_data_callback