笔者跟着鱼香ROS的ROS2学习之旅
学习参考:
【ROS2机器人入门到实战】
笔者的学习目录
参数是节点的一个配置值,你可以认为参数是一个节点的设置
ROS2支持的参数值的类型
bool 和bool[],布尔类型用来表示开关,比如我们可以控制雷达控制节点,开始扫描和停止扫描。
int64 和int64[],整形表示一个数字,含义可以自己来定义,这里我们可以用来表示李四节点写小说的周期值
float64 和float64[],浮点型,可以表示小数类型的参数值
string 和string[],字符串,可以用来表示雷达控制节点中真实雷达的ip地址
byte[],字节数组,这个可以用来表示图片,点云数据等信息
打开终端,运行小乌龟节点
# 终端1
ros2 run turtlesim turtlesim_node
# 终端2
ros2 run turtlesim turtle_teleop_key
查看参数信息
# 列举所有的参数
ros2 param list
# 参数的详细信息
ros2 param describe <node_name> <param_name>
# 举例
ros2 param describe /turtlesim background_b
# 查看参数值
ros2 param get /turtlesim background_b
设置参数
ros2 param set <node_name> <parameter_name> <value>
# 举例
ros2 param set /turtlesim background_r 44
保存参数
ros2 param dump <node_name>
# 举例
ros2 param dump /turtlesim
文件被保存成了yaml格式,用cat指令看一看
cat ./turtlesim.yaml
恢复参数值
ros2 param load /turtlesim ./turtlesim.yaml
启动节点时加载参数快照
ros2 run <package_name> <executable_name> --ros-args --params-file <file_name>
# 举例
ros2 run turtlesim turtlesim_node --ros-args --params-file ./turtlesim.yaml
cd alian_ws/
ros2 pkg create alian_parameters_rclpy --build-type ament_python --dependencies rclpy --destination-directory src --node-name parameters_basic --maintainer-name "alian"
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
class ParametersBasicNode(Node):
"""
创建一个ParametersBasicNode节点,并在初始化时输出一个话
"""
def __init__(self, name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
# 声明参数
self.declare_parameter('alian_level', 0)
# 获取参数
log_level = self.get_parameter("alian_level").value
# 设置参数
self.get_logger().set_level(log_level)
# 定时修改
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
"""定时器回调函数"""
# 获取参数
log_level = self.get_parameter("alian_level").value
# 设置参数
self.get_logger().set_level(log_level)
print(
f"========================{log_level}=============================")
self.get_logger().debug("我是DEBUG级别的日志,我被打印出来了!")
self.get_logger().info("我是INFO级别的日志,我被打印出来了!")
self.get_logger().warn("我是WARN级别的日志,我被打印出来了!")
self.get_logger().error("我是ERROR级别的日志,我被打印出来了!")
self.get_logger().fatal("我是FATAL级别的日志,我被打印出来了!")
def main(args=None):
rclpy.init(args=args) # 初始化rclpy
node = ParametersBasicNode("parameters_basic") # 新建一个节点
rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
rclpy.shutdown() # 关闭rclpy
colcon build --packages-select alian_parameters_rclpy
source install/setup.bash
ros2 run alian_parameters_rclpy parameters_basic
1. 指定参数值测试
终端1:
ros2 run alian_parameters_rclpy parameters_basic --ros-args -p alian_level:=10
2. 动态设置参数测试
在终端1运行的前提下,打开终端2:
ros2 param list
ros2 param set /parameters_basic alian_level 40
参数是由服务构建出来了,而Action是由话题和服务共同构建出来的(一个Action = 三个服务+两个话题)
三个服务分别是: 1.目标传递服务 2.结果传递服务 3.取消执行服务
两个话题:1.反馈话题(服务发布,客户端订阅)2.状态话题(服务端发布,客户端订阅)
列举目前存在的动作
ros2 action list -t
获取接口的信息
ros2 interface show turtlesim/action/RotateAbsolute
查看动作信息
ros2 action info /turtle1/rotate_absolute
ros2 action send_goal /turtle1/rotate_absolute turtlesim/action/RotateAbsolute "{theta: 1.5}" --feedback
1. 创建接口功能包和接口文件
cd alian_ws/
ros2 pkg create robot_control_interfaces --build-type ament_cmake --destination-directory src --maintainer-name "alian"
mkdir -p src/robot_control_interfaces/action
touch src/robot_control_interfaces/action/MoveRobot.action
2. 修改packages.xml
<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
3. 修改CMakeLists.txt
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/MoveRobot.action"
)
4. 编写MoveRobot.action接口
# Goal: 要移动的距离
float32 distance
---
# Result: 最终的位置
float32 pose
---
# Feedback: 中间反馈的位置和状态
float32 pose
uint32 status
uint32 STATUS_MOVEING = 3
uint32 STATUS_STOP = 4
5. 编译生成接口
colcon build --packages-select robot_control_interfaces
1. 创建功能包和节点
cd alian_ws/
ros2 pkg create alian_action_rclpy --build-type ament_python --dependencies rclpy robot_control_interfaces --destination-directory src --node-name action_robot --maintainer-name "alian"
# 手动再创建action_control_02节点文件
touch src/alian_action_rclpy/alian_action_rclpy/action_control.py
#手动创建机器人类robot.py
touch src/alian_action_rclpy/alian_action_rclpy/robot.py
2. 节点文件1:robot.py 机器人类
from robot_control_interfaces.action import MoveRobot
import math
class Robot():
"""机器人类,模拟一个机器人"""
def __init__(self) -> None:
self.current_pose_ = 0.0
self.target_pose_ = 0.0
self.move_distance_ = 0.0
self.status_ = MoveRobot.Feedback
def get_status(self):
"""获取状态"""
return self.status_
def get_current_pose(self):
"""获取当前位置"""
return self.current_pose_
def close_goal(self):
"""接近目标"""
return math.fabs(self.target_pose_ - self.current_pose_) < 0.01
def stop_move(self):
"""停止移动"""
self.status_ = MoveRobot.Feedback.STATUS_STOP
def move_step(self):
"""移动一小步"""
direct = self.move_distance_ / math.fabs(self.move_distance_)
step = direct * math.fabs(self.target_pose_ - self.current_pose_) * 0.1
self.current_pose_ += step # 移动一步
print(f"移动了:{step}当前位置:{self.current_pose_}")
return self.current_pose_
def set_goal(self, distance):
"""设置目标"""
self.move_distance_ = distance
self.target_pose_ += distance # 更新目标位置
if self.close_goal():
self.stop_move()
return False
self.status_ = MoveRobot.Feedback.STATUS_MOVEING # 更新状态为移动
return True
3. 节点文件2:action_robot.py 机器人节点
#!/usr/bin/env python3
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from alian_action_rclpy.robot import Robot
#from rclpy.executors import MultiThreadedExecutor
#from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
class ActionRobot(Node):
"""机器人端Action服务"""
def __init__(self,name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
self.robot_ = Robot()
self.action_server_ = ActionServer(
self, MoveRobot, 'move_robot', self.execute_callback
# ,callback_group=MutuallyExclusiveCallbackGroup()
)
def execute_callback(self, goal_handle: ServerGoalHandle):
"""执行回调函数,若采用默认handle_goal函数则会自动调用"""
self.get_logger().info('执行移动机器人')
feedback_msg = MoveRobot.Feedback()
self.robot_.set_goal(goal_handle.request.distance)
# rate = self.create_rate(2)
while rclpy.ok() and not self.robot_.close_goal():
# move
self.robot_.move_step()
# feedback
feedback_msg.pose = self.robot_.get_current_pose()
feedback_msg.status = self.robot_.get_status()
goal_handle.publish_feedback(feedback_msg)
# cancel check
if goal_handle.is_cancel_requested:
result = MoveRobot.Result()
result.pose = self.robot_.get_current_pose()
return result
# rate.sleep() # Rate会造成死锁,单线程执行器时不能使用
time.sleep(0.5)
goal_handle.succeed()
result = MoveRobot.Result()
result.pose = self.robot_.get_current_pose()
return result
def main(args=None):
"""主函数"""
rclpy.init(args=args)
action_robot = ActionRobot("action_robot")
# 采用多线程执行器解决rate死锁问题
# executor = MultiThreadedExecutor()
# executor.add_node(action_robot_02)
# executor.spin()
rclpy.spin(action_robot)
rclpy.shutdown()
4. 节点文件3:action_control.py 控制节点
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入Action接口
from robot_control_interfaces.action import MoveRobot
class ActionControl(Node):
"""Action客户端"""
def __init__(self, name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
self.action_client_ = ActionClient(self, MoveRobot, 'move_robot')
self.send_goal_timer_ = self.create_timer(1, self.send_goal)
def send_goal(self):
"""发送目标"""
self.send_goal_timer_.cancel()
goal_msg = MoveRobot.Goal()
goal_msg.distance = 5.0
self.action_client_.wait_for_server()
self._send_goal_future = self.action_client_.send_goal_async(goal_msg,
feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""收到目标处理结果"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
"""获取结果反馈"""
result = future.result().result
self.get_logger().info(f'Result: {result.pose}')
def feedback_callback(self, feedback_msg):
"""获取回调反馈"""
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.pose}')
def main(args=None):
"""主函数"""
rclpy.init(args=args)
action_robot = ActionControl("action_control")
rclpy.spin(action_robot)
rclpy.shutdown()
5. 修改setup.py
'console_scripts': [
'robot = alian_action_rclpy.robot:main',
'action_robot = alian_action_rclpy.action_robot:main',
'action_control = alian_action_rclpy.action_control:main'
6. 编译测试
cd alian_ws/
colcon build --packages-up-to alian_action_rclpy
# 运行机器人节点
source install/setup.bash
ros2 run alian_action_rclpy action_robot
# 新终端
source install/setup.bash
ros2 run alian_action_rclpy action_control