控制机器人并接收机器人数据 ============================= .. note:: 该文档主要描述了Bubble内部的通讯接口实现,关于上下位通讯串口协议及配置,请参见 ``木鸢通讯协议`` **Goal:** 通过Bubble控制机器人运动,并能得到当前机器人得状态信息 .. contents:: 目录 :depth: 2 :local: 先决条件 ------------------------------ 您具备了ROS的基本知识,掌握了如何编写一个 `节点 `__ ,已经成功运行了 `bubble_protocol` 模块,MCU能够通过BCP控制机器人运动。 1 控制机器人运动 ------------------------------ 通过 `bubble_protocol` 模块定义的API,能够控制机器人运动。 下面的例子展示了每隔0.5s使机器人云台相对于自身当前位置偏移0.01弧度。 1.1 编写代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. tabs:: .. group-tab:: Python .. code-block:: python :linenos: import rclpy from rclpy.node import Node from rmctrl_msgs.msg import Gimbal class MinimalPublisher(Node): def __init__(self): super().__init__('minimal_publisher') self.gimbal_pub = self.create_publisher(Gimbal, '/core/gimbal_api', 10) self.timer = self.create_timer(0.5, self.timer_callback) def timer_callback(self): gimbal_msg = Gimbal() gimbal_msg.mode = 1 gimbal_msg.header.stamp = self.get_clock().now().to_msg() gimbal_msg.yaw = 0.01 gimbal_msg.pitch = 0. gimbal_msg.roll = 0. self.gimbal_pub.publish(gimbal_msg) def main(args=None): rclpy.init(args=args) minimal_publisher = MinimalPublisher() rclpy.spin(minimal_publisher) minimal_publisher.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() .. group-tab:: C++ .. code-block:: C++ // TODO 1.2 解释代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 首先引入了云台控制信息对应的msg和ros依赖包: .. code-block:: python import rclpy from rclpy.node import Node from rmctrl_msgs.msg import Gimbal 构建一个Node,该Node中包含一个timer,每隔0.5秒执行 ``timer_callback`` 函数: .. code-block:: python class MinimalPublisher(Node): def __init__(self): super().__init__('minimal_publisher') self.gimbal_pub = self.create_publisher(Gimbal, '/core/gimbal_api', 10) self.timer = self.create_timer(0.5, self.timer_callback) ``timer_callback`` 函数中实例化了 ``Gimbal`` 类型的消息对象,依次在消息中填入需要发送的数值,并通过 ``gimbal_pub`` 发送至 ``/core/gimbal_api`` topic下: .. code-block:: python def timer_callback(self): gimbal_msg = Gimbal() gimbal_msg.mode = 1 gimbal_msg.header.stamp = self.get_clock().now().to_msg() gimbal_msg.yaw = 0.01 gimbal_msg.pitch = 0. gimbal_msg.roll = 0. self.gimbal_pub.publish(gimbal_msg) 1.3 编译并运行代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 打开一个终端编译并运行当前节点 .. code-block:: console colcon build --packages-select YOUR_PKG . install/setup.bash ros2 run YOUR_PKG YOUR_EXEC 在另一个终端中打开BCP core .. code-block:: console . install/setup.bash ros2 launch bubble_protocol bcp_api_core_launch.py 如果你已经正确的配置了机器人,你应该看到机器人每隔0.5秒,云台绕yaw轴偏转0.1rad。 2 接收来自机器人的状态信息 ------------------------------------------------- BCP中定义了有关机器人的状态信息,这些信息包括了: * 机器人当前机械结构的相对位置,类型在 `rmctrl_msgs` 功能包的中定义。如云台当前偏转角度等。 * 裁判系统发送的比赛信息,类型在 `game_msgs` 功能包中定义。如比赛剩余时间、机器人剩余血量等。 下面的例子展示了接收机器人的云台偏移量,并在终端中打印。 1.1 编写代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. tabs:: .. group-tab:: Python .. code-block:: python import rclpy from rclpy.node import Node from rmctrl_msgs.msg import Gimbal class MinimalSubscriber(Node): def __init__(self): super().__init__("MinimalSubscriber") # robot gimbal state subscriber self.gimbal_sub = self.create_subscription( Gimbal, '/status/gimbal', self.gimbal_callback, 10) # show gimbal data by timer period = 150 self.create_timer(1/period, self.sub_timer) # robot gimbal info list self.gimbal_info = [] def gimbal_callback(self, data: Gimbal): self.gimbal_info.append(data) def sub_timer(self): if not self.gimbal_info: data = self.gimbal_info.pop() self.get_logger().info("recived data gimbal, yaw: {}, pitch: {}, roll: {}".format( data.yaw, data.pitch, data.roll)) def main(args=None): rclpy.init(args=args) minimal_subscriber = MinimalSubscriber() rclpy.spin(minimal_subscriber) minimal_subscriber.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() .. group-tab:: C++ .. code-block:: C++ // TODO 2.2 解释代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 首先引入了云台控制信息对应的msg和ros依赖包: .. code-block:: python import rclpy from rclpy.node import Node from rmctrl_msgs.msg import Gimbal 构建一个Node,该Node中包含一个订阅者 ``gimbal_sub`` 和一个定时器,定时器每秒执行 ``period`` 次 ``sub_timer`` 函数。 当BCP core接收到MUC发送的云台信息时,会通过topic ``/status/gimbal`` 发送,当节点接收到topic发送的信息时,执行 ``self.gimbal_callback`` 函数。 此外还定义了一个列表 ``self.gimbal_info`` 存放接收到的信息: .. code-block:: python class MinimalSubscriber(Node): def __init__(self): super().__init__("MinimalSubscriber") # robot gimbal state subscriber self.gimbal_sub = self.create_subscription( Gimbal, '/status/gimbal', self.gimbal_callback, 10) # show gimbal data by timer period = 150 self.create_timer(1/period, self.sub_timer) # robot gimbal info list self.gimbal_info = [] ``self.gimbal_callback`` 函数会将收到的消息放入 ``self.gimbal_info`` 中: .. code-block:: python def gimbal_callback(self, data: Gimbal): self.gimbal_info.append(data) 当 ``self.gimbal_callback`` 中存在数据时,定时器会将其中的信息打印出来: .. code-block:: python def sub_timer(self): if not self.gimbal_info: data = self.gimbal_info.pop() self.get_logger().info("recived gimbal data, yaw: {}, pitch: {}, roll: {}".format( data.yaw, data.pitch, data.roll)) 2.3 编译并运行代码 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 打开一个终端编译并运行当前节点 .. code-block:: console colcon build --packages-select YOUR_PKG . install/setup.bash ros2 run YOUR_PKG YOUR_EXEC 在另一个终端中打开BCP core .. code-block:: console . install/setup.bash ros2 launch bubble_protocol bcp_api_core_launch.py 您可以运动机器人的云台,此时,机器人的云台数据应该被正确的打印出来: .. code-block:: console [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 [INFO] [minimal_subscriber]: Publishing: recived gimbal data, yaw: 0.0, pitch: 0.0, roll: 0.0 ... .. note:: 打印的频率取决于MUC发送数据的频率,数值的大小取决于机器人坐标系的定义。一般地,在Bubble中定义的数值可以参阅 `机器人坐标系定义 `__ 。