目录

ROS2-话题学习

ROS2-话题学习

强烈推荐教程:

构建功能包

create package demo_python_pkg

ros2 pkg create –build-type ament_python –license Apache-2.0 demo_python_pkg

自己写的代码放在./demo_python_pkg/demo_python_pkg目录下

发布者

import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue


class NovelPubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.novels_queue = Queue()
        self.get_logger().info("NovelPubNode has been started")
        self.novel_pub = self.create_publisher(String, "novel_topic", 10)

        self.timer = self.create_timer(5,self.timer_callback)


    def timer_callback(self):
        if not self.novels_queue.empty():
            line = self.novels_queue.get()
            msg = String()
            msg.data = line
            self.novel_pub.publish(msg)
            self.get_logger().info(f"Published novel: {line}")       

    def download_novel(self, url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        text = response.text
        for text_line in text.splitlines():
            self.novels_queue.put(text_line)

        self.get_logger().info(f"Downloaded novel: {text}")



def main():
    rclpy.init()
    node = NovelPubNode("novel_pub_node")
    node.download_novel("http://0.0.0.0:8000/novel_1.txt")
    rclpy.spin(node)
    node.shutdown()

订阅者

import time
import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading


class NovelSubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.get_logger().info("NovelSubNode has been created!")
        self.novels_queue = Queue()
        self.create_subscription(String, "novel_topic", self.novel_callback, 10)
        self.say_thread = threading.Thread(target=self.say)
        self.say_thread.start()
        


    def novel_callback(self, msg):
        self.novels_queue.put(msg.data)
        


    def say(self):
        engine = espeakng.Speaker()
        engine.voice = "zh"
        while rclpy.ok():
            if not self.novels_queue.empty():
                novel = self.novels_queue.get()
                engine.say(novel)
                print("Said: " + novel)
                engine.wait()
            else:
                time.sleep(1)



def main():
    rclpy.init()
    node = NovelSubNode("novel_sub_node")
    rclpy.spin(node)
    rclpy.shutdown()
    

代码完成后,配置setup.py文件

https://i-blog.csdnimg.cn/direct/d6e934477bc845c2b6d8fd74bec0f942.png

格式为:“名字 = 包名.文件名:函数名”

在根目录运行以下终端命令

build package demo_python_pkg

colcon build

source setup.bash

source install/setup.bash

运行以上命令后,得到build、install、log文件夹

可执行的节点文件在以下文件夹

./install/demo_python_pkg/lib/demo_python_pkg

运行节点命令

ros2 run demo_python_pkg python_pub_node

ros2 run demo_python_pkg python_sub_node

其他常用命令

check if node is running

ros2 node list

check if topic is published

ros2 topic list

check topic content

ros2 topic echo /novel_topic

check topic speed

ros2 topic hz /novel_topic

check if service is available

ros2 service list