添加自定义数据控制机器人

Goal: 通过添加自定义的数据类型,在bubble传递数据,控制机器人完成自定义的功能

先决条件

您具备了ROS的基本知识,掌握了如何 创建自定义 msg 和 srv 文件。 并能够了解 在Bubble中如何发送和接收数据

1 添加自定义消息类型

自定义的消息类型应该被放置于 bubble_interface 模块中,绝大部分需要使用比赛信息和识别器推理结果已在 game_msgsbbox_ex_msgs 中定义

对于自定义的控制信息,应被放置于 rmctrl_msgs 功能包下。

1.1 编写消息文件

rmctrl_msgs/mgs 下新建文件:

touch rmctrl_msgs/mgs/CustomDataType.msg

CustomDataType.msg 编写内容:

std_msgs/Header header
int8 test_datat_type1
float64 test_datat_type2
bool test_datat_type3

我们在 CustomDataType.msg 中添加了一个 int8 类型的变量 test_datat_type1float64 类型的变量 test_datat_type2bool 类型的变量 test_datat_type3 ,该消息类型包含 header

1.2 CMakeLists.txt

下面将自定义的消息类型添加到 CMakeLists.txt

rosidl_generate_interfaces(${PROJECT_NAME}
    ...
    "msg/CustomDataType.msg"
    DEPENDENCIES std_msgs
)

1.3 重新构建 rmctrl_msgs

colcon build --packages-select rmctrl_msgs --symlink-install

2 发送自定义消息至MCU

2.1 在BCP core中添加新的消息类型

在BCP分发层中,添加subscriber,修改 bubble_core/bubble_protocol/bubble_protocol/dispatch.py

from rmctrl_msgs.msg import CustomDataType

'''Some standard API interface code'''

# Expanded api definition
def init_robot(self, name):
    if name == "sentry_up":
    # Other robot API define codes
    # Suppose AI robot that needs to send a custom message
    elif name == "standard":
        self.custom_data_sub = self.create_subscription(
            CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10)

def ex_custom_data_callback(self, msg: CustomDataType):
    custom_data_list = []
    custom_data_list.append(msg.test_datat_type1)
    custom_data_list.append(msg.test_datat_type2)
    custom_data_list.append(msg.test_datat_type3)
    self.robot_serial.send_data("custom_data", custom_data_list)

2.2 在协议中添加新的消息类型

在BCP协议层中定义上位机和下位机通讯的数据类型,修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py

custom_data_info = OrderedDict()
custom_data_info["test_datat_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
custom_data_info["test_datat_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000]
custom_data_info["test_datat_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]


ID = {
    "chassis":     [0x10, chassis_info],
    # Other ID API define codes
    # Suppose ID is 85
    "custom_data": [0x55, custom_data_info],
}

2.3 解释代码

首先包含我们刚刚定义的 CustomDataType 消息类型

from rmctrl_msgs.msg import CustomDataType

在分发层中,我们创建了一个名为 custom_data_sub 的subscriber,接收来自 /core/custom_data_api 中的数据,数据类型为自定义的 CustomDataType,接收到数据后,会执行回调函数 ex_custom_data_callback :

self.custom_data_sub = self.create_subscription(
    CustomDataType, '/core/custom_data_api', self.ex_custom_data_callback, 10)

回调函数 ex_custom_data_callback 会将收到的数据存放到一个列表中,BCP core根据数据的定义发送至下位机:

def ex_custom_data_callback(self, msg: CustomDataType):
    custom_data_list = []
    custom_data_list.append(msg.test_datat_type1)
    custom_data_list.append(msg.test_datat_type2)
    custom_data_list.append(msg.test_datat_type3)
    self.robot_serial.send_data("custom_data", custom_data_list)

之后,我们需要定义发送至MCU的数据格式,我们设置了三个元素分别以 uint8int32uint8 的格式,放缩1、1000、1的倍数发送:

custom_data_info = OrderedDict()
custom_data_info["test_data_type1_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]
custom_data_info["test_data_type2_mean"] = [TYPE_FOR_CTYPE["int32"], 1, 1000]
custom_data_info["test_data_type3_mean"] = [TYPE_FOR_CTYPE["uint8"], 1, 1]

Note

使用 OrderedDict() 定义发送数据类型,仅仅是为了保持对python 3.6及更低版本解释器的兼容

最后我们定义 custom_data 的ID为85:

ID = {
        # ...
        "custom_data": [0x55, custom_data_info],
    }

3 接受来自MCU的自定义消息

Attention

在后续的Bubble版本中,可能对机器人状态发布的BCP部分发生小范围的改动

3.1 添加机器人状态定义

在机器人状态中,添加自定义的机器人状态信息。修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py

STATUS = {
    # Other status define codes
    "custom_data": copy.deepcopy(custom_data_info)
}

(可选地)对于实时性要求较高的数据,可以采用 实时API 的方式发送,修改代码:

REALTIME_CALLBACK = {
    # Other realtime status define codes
   "custom_data": None
}

3.1 发布自定义的机器人状态信息

修改 bubble_core/bubble_protocol/bubble_protocol/protocol.py

def status_init(self):
    # non-realtime publisher api
    self.custom_data_pub = self.node.create_publisher(
        CustomDataType, '/status/custom_data', 10)

def non_realtime_status(self):
    custom_data_msg = CustomDataType()
    custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL])
    custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL])
    custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL])
    self.custom_data_pub.publish(custom_data_msg)

(可选地)对于实时状态信息:

def status_init(self):
    def custom_data_callback():
        custom_data_msg = CustomDataType()
        custom_data_msg.test_data_type1 = int(self.status["custom_data"]["test_data_type1_mean"][IDX_VAL])
        custom_data_msg.test_data_type2 = float(self.status["custom_data"]["test_data_type2_mean"][IDX_VAL])
        custom_data_msg.test_data_type3 = bool(self.status["custom_data"]["test_data_type3_mean"][IDX_VAL])
        self.custom_data_pub.publish(custom_data_msg)

    # real-time publisher api
    self.custom_data_pub = self.node.create_publisher(
        CustomDataType, '/status/custom_data', 10)
    self.realtime_callback["custom_data"] = custom_data_callback