通过定时器实现
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Simple_Node(Node):
def __init__(self):
super().__init__('simple_node')
# Create a timer that will gate the node actions twice a second
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.node_callback)
#
def node_callback(self):
self.get_logger().info('simple_node is alive')
def main(args=None):
rclpy.init(args=args)
my_node = Simple_Node() # instantiate my simple_node
rclpy.spin(my_node) # execute simple_node
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
my_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
CPP实现
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
class SimpleNode : public rclcpp::Node {
public:
SimpleNode() : Node("simple_node"), count_(0) {
// 创建发布者
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
// 创建定时器,每秒触发一次
timer_ = this->create_wall_timer(std::chrono::seconds(1),
std::bind(&SimpleNode::timer_callback, this));
}
private:
void timer_callback() {
auto message = std_msgs::msg::String();
message.data = "Help me body, you are my only hope";
RCLCPP_INFO(this->get_logger(), message.data);
publisher_->publish(message);
count_++;
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
size_t count_;
};
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<SimpleNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
导入必要的库和模块:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
这里导入了ROS 2的Python库(rclpy
),以及一些ROS 2消息类型(String
)。
创建自定义节点类:
class Simple_Node(Node):
创建一个继承自Node
的自定义节点类。
初始化节点:
def __init__(self):
super().__init__('simple_node')
在节点类的构造函数中,调用super().__init__('simple_node')
来初始化节点,指定节点的名称为simple_node
。
创建定时器:
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.node_callback)
创建一个定时器,每0.5秒触发一次,定时器触发时会调用self.node_callback
方法。
回调函数:
def node_callback(self):
self.get_logger().info('simple_node is alive')
这是定时器触发时执行的回调函数。在这个例子中,它仅仅是输出日志信息,指示节点仍然在运行。
主函数:
def main(args=None):
rclpy.init(args=args)
my_node = Simple_Node() # instantiate my simple_node
rclpy.spin(my_node) # execute simple_node
my_node.destroy_node()
rclpy.shutdown()
主函数进行ROS 2的初始化,实例化自定义节点Simple_Node
,执行节点(通过rclpy.spin(my_node)
),然后在结束时销毁节点并关闭ROS。
这个脚本创建了一个简单的ROS节点,该节点每0.5秒输出一条日志信息。你可以通过在node_callback
方法中添加其他功能来扩展该节点的行为。
使用多线程实现
import threading
import rclpy
rclpy.init()
node = rclpy.create_node('simple_node')
# Spin in a separate thread
thread = threading.Thread(target=rclpy.spin, args=(node, ), daemon=True)
thread.start()
rate = node.create_rate(2)
try:
while rclpy.ok():
print('Help me body, you are my only hope')
rate.sleep()
except KeyboardInterrupt:
pass
rclpy.shutdown()
thread.join()
cpp代码
#include <chrono>
#include <thread>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
class SimpleNode : public rclcpp::Node {
public:
SimpleNode() : Node("simple_node"), count_(0) {
// 创建发布者
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
// 创建定时器,每秒触发一次
timer_ = this->create_wall_timer(std::chrono::seconds(1),
std::bind(&SimpleNode::timer_callback, this));
}
private:
void timer_callback() {
auto message = std_msgs::msg::String();
message.data = "Help me body, you are my only hope";
RCLCPP_INFO(this->get_logger(), message.data);
publisher_->publish(message);
count_++;
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
size_t count_;
};
void spinNode(rclcpp::Node::SharedPtr node) {
rclcpp::spin(node);
}
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<SimpleNode>();
// 创建一个新线程,该线程的目标是执行ROS节点的spin函数
std::thread spinThread(spinNode, node);
spinThread.detach(); // 让线程在后台运行
rclcpp::Rate rate(2);
while (rclcpp::ok()) {
std::cout << "Help me body, you are my only hope" << std::endl;
rate.sleep();
}
rclcpp::shutdown();
// 主线程退出时等待ROS节点的spin线程结束
if (spinThread.joinable()) {
spinThread.join();
}
return 0;
}
导入必要的库和模块:
import threading
import rclpy
导入了threading
模块用于多线程操作,以及ROS 2的Python库。
ROS节点初始化:
rclpy.init()
node = rclpy.create_node('simple_node')
这里初始化了ROS 2,创建了一个名为simple_node
的ROS节点。
使用多线程执行节点:
thread = threading.Thread(target=rclpy.spin, args=(node, ), daemon=True)
thread.start()
创建一个新线程,该线程的目标是执行ROS节点的spin
函数。daemon=True
表示这是一个守护线程,当主线程退出时,守护线程也会退出。
创建定时器:
rate = node.create_rate(2)
创建一个ROS节点的定时器,频率为2Hz。
循环输出信息:
try:
while rclpy.ok():
print('Help me body, you are my only hope')
rate.sleep()
except KeyboardInterrupt:
pass
在一个循环中,每隔0.5秒输出一条信息。这里使用了rate.sleep()
来控制循环的频率。
关闭节点和等待线程结束:
rclpy.shutdown()
thread.join()
在键盘中断(Ctrl+C)之后,关闭ROS节点并等待线程结束。
from rclpy.colock import Clock
time_now=Clock().now()
这样就可以将参数传入Mynode中
# my_node.py
import rclpy
from rclpy.node import Node
from rclpy.utilities import get_launch_context
class MyNode(Node):
def __init__(self, my_parameter, another_parameter):
super().__init__('my_node')
# 使用传递的参数
self.my_parameter = my_parameter
self.another_parameter = another_parameter
self.get_logger().info(f'My parameter: {self.my_parameter}')
self.get_logger().info(f'Another parameter: {self.another_parameter}')
def main(args=None):
rclpy.init(args=args)
# 获取参数
launch_context = get_launch_context()
my_parameter = launch_context.launch_configurations['my_parameter'] or 'default_value'
another_parameter = launch_context.launch_configurations['another_parameter'] or 'default_value'
my_node = MyNode(my_parameter, another_parameter)
rclpy.spin(my_node)
rclpy.shutdown()
if __name__ == '__main__':
main()