ros2 底盘控制程序实践

发布时间:2024年01月18日

定频率发布信息1

通过定时器实现

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class Simple_Node(Node):

    def __init__(self):
        super().__init__('simple_node')

        # Create a timer that will gate the node actions twice a second
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.node_callback)

    # 
    def node_callback(self):
        self.get_logger().info('simple_node is alive')



def main(args=None):
    rclpy.init(args=args)

    my_node = Simple_Node()   # instantiate my simple_node

    rclpy.spin(my_node)       # execute simple_node 

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    my_node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

CPP实现

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

class SimpleNode : public rclcpp::Node {
public:
  SimpleNode() : Node("simple_node"), count_(0) {
    // 创建发布者
    publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);

    // 创建定时器,每秒触发一次
    timer_ = this->create_wall_timer(std::chrono::seconds(1),
                                     std::bind(&SimpleNode::timer_callback, this));
  }

private:
  void timer_callback() {
    auto message = std_msgs::msg::String();
    message.data = "Help me body, you are my only hope";
    RCLCPP_INFO(this->get_logger(), message.data);
    publisher_->publish(message);
    count_++;
  }

  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
  size_t count_;
};

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  auto node = std::make_shared<SimpleNode>();
  rclcpp::spin(node);
  rclcpp::shutdown();
  return 0;
}

  1. 导入必要的库和模块:

    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    

    这里导入了ROS 2的Python库(rclpy),以及一些ROS 2消息类型(String)。

  2. 创建自定义节点类:

    class Simple_Node(Node):
    

    创建一个继承自Node的自定义节点类。

  3. 初始化节点:

    def __init__(self):
        super().__init__('simple_node')
    

    在节点类的构造函数中,调用super().__init__('simple_node')来初始化节点,指定节点的名称为simple_node

  4. 创建定时器:

        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.node_callback)
    

    创建一个定时器,每0.5秒触发一次,定时器触发时会调用self.node_callback方法。

  5. 回调函数:

    def node_callback(self):
        self.get_logger().info('simple_node is alive')
    

    这是定时器触发时执行的回调函数。在这个例子中,它仅仅是输出日志信息,指示节点仍然在运行。

  6. 主函数:

    def main(args=None):
        rclpy.init(args=args)
    
        my_node = Simple_Node()   # instantiate my simple_node
    
        rclpy.spin(my_node)       # execute simple_node 
    
        my_node.destroy_node()
        rclpy.shutdown()
    

    主函数进行ROS 2的初始化,实例化自定义节点Simple_Node,执行节点(通过rclpy.spin(my_node)),然后在结束时销毁节点并关闭ROS。

这个脚本创建了一个简单的ROS节点,该节点每0.5秒输出一条日志信息。你可以通过在node_callback方法中添加其他功能来扩展该节点的行为。

定频率发布信息2

使用多线程实现

import threading

import rclpy

rclpy.init()
node = rclpy.create_node('simple_node')

# Spin in a separate thread
thread = threading.Thread(target=rclpy.spin, args=(node, ), daemon=True)
thread.start()

rate = node.create_rate(2)

try:
    while rclpy.ok():
        print('Help me body, you are my only hope')
        rate.sleep()
except KeyboardInterrupt:
    pass

rclpy.shutdown()
thread.join()

cpp代码

#include <chrono>
#include <thread>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

class SimpleNode : public rclcpp::Node {
public:
  SimpleNode() : Node("simple_node"), count_(0) {
    // 创建发布者
    publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);

    // 创建定时器,每秒触发一次
    timer_ = this->create_wall_timer(std::chrono::seconds(1),
                                     std::bind(&SimpleNode::timer_callback, this));
  }

private:
  void timer_callback() {
    auto message = std_msgs::msg::String();
    message.data = "Help me body, you are my only hope";
    RCLCPP_INFO(this->get_logger(), message.data);
    publisher_->publish(message);
    count_++;
  }

  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
  size_t count_;
};

void spinNode(rclcpp::Node::SharedPtr node) {
  rclcpp::spin(node);
}

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);
  auto node = std::make_shared<SimpleNode>();

  // 创建一个新线程,该线程的目标是执行ROS节点的spin函数
  std::thread spinThread(spinNode, node);
  spinThread.detach(); // 让线程在后台运行

  rclcpp::Rate rate(2);
  while (rclcpp::ok()) {
    std::cout << "Help me body, you are my only hope" << std::endl;
    rate.sleep();
  }

  rclcpp::shutdown();

  // 主线程退出时等待ROS节点的spin线程结束
  if (spinThread.joinable()) {
    spinThread.join();
  }

  return 0;
}

  1. 导入必要的库和模块:

    import threading
    import rclpy
    

    导入了threading模块用于多线程操作,以及ROS 2的Python库。

  2. ROS节点初始化:

    rclpy.init()
    node = rclpy.create_node('simple_node')
    

    这里初始化了ROS 2,创建了一个名为simple_node的ROS节点。

  3. 使用多线程执行节点:

    thread = threading.Thread(target=rclpy.spin, args=(node, ), daemon=True)
    thread.start()
    

    创建一个新线程,该线程的目标是执行ROS节点的spin函数。daemon=True表示这是一个守护线程,当主线程退出时,守护线程也会退出。

  4. 创建定时器:

    rate = node.create_rate(2)
    

    创建一个ROS节点的定时器,频率为2Hz。

  5. 循环输出信息:

    try:
        while rclpy.ok():
            print('Help me body, you are my only hope')
            rate.sleep()
    except KeyboardInterrupt:
        pass
    

    在一个循环中,每隔0.5秒输出一条信息。这里使用了rate.sleep()来控制循环的频率。

  6. 关闭节点和等待线程结束:

    rclpy.shutdown()
    thread.join()
    

    在键盘中断(Ctrl+C)之后,关闭ROS节点并等待线程结束。

获取当前时间

from rclpy.colock import Clock
time_now=Clock().now()

参数读取

这样就可以将参数传入Mynode中

# my_node.py

import rclpy
from rclpy.node import Node
from rclpy.utilities import get_launch_context

class MyNode(Node):
    def __init__(self, my_parameter, another_parameter):
        super().__init__('my_node')

        # 使用传递的参数
        self.my_parameter = my_parameter
        self.another_parameter = another_parameter

        self.get_logger().info(f'My parameter: {self.my_parameter}')
        self.get_logger().info(f'Another parameter: {self.another_parameter}')

def main(args=None):
    rclpy.init(args=args)

    # 获取参数
    launch_context = get_launch_context()
    my_parameter = launch_context.launch_configurations['my_parameter'] or 'default_value'
    another_parameter = launch_context.launch_configurations['another_parameter'] or 'default_value'

    my_node = MyNode(my_parameter, another_parameter)
    rclpy.spin(my_node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

文章来源:https://blog.csdn.net/qq2650326396/article/details/135616478
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。