-
Notifications
You must be signed in to change notification settings - Fork 17
1. Basic ROS2 Example
In this page, we introduce the basic ROS2 functions to understand the ROS2 system and how to make publisher and subscriber nodes. Based on the official ROS2 website, we generated the templates to build the publisher and subscriber, and both nodes. If you are a beginner to ROS2/ROS, you may want to copy and paste the template, and try to understand how to make the nodes in ROS2.
If you want to see the official documents, please visit this link: https://index.ros.org/doc/ros2/Tutorials/Writing-A-Simple-Py-Publisher-And-Subscriber/.
The contents of this page are as follows:
You can also read or download a PDF version of the SMARTmBOT Guide/Manual, and watch a full demo video of the SMARTmBOT at https://youtu.be/cn3vcqFgf90.
- Step 1. Create a workspace for ROS2.
$ mkdir -p ~/ros2_ws/src && cd ~/ros2_ws/src
$ cd ~/ros2_ws/
$ echo 'source /opt/ros/dashing/setup.bash' >> ~/.bashrc
$ echo 'source ~/ros2_ws/install/setup.bash' >> ~/.bashrc
$ colcon build --symlink-install
$ source install/setup.bash
- Step 2. Create a package having a node for a publisher and subscriber
$ cd ~/ros2_ws/src
$ ros2 pkg create --build-type ament_python --maintainer-email [email protected] --node-name pub_node pub_sub_pkg
$ cd ~/ros2_ws
$ colcon build --symlink-install
$ source install/setup.bash
- Step 3. Test the publisher node
$ ros2 run pub_sub_pkg pub_node
- Result:
Hi from pub_sub_pkg.
(1) Publisher Node
- Step 1. Go to the package folder, open pub_nano.py
$ cd ~/ros2_ws/src/pub_sub_pkg/pub_sub_pkg
$ nano pub_node.py
- Step 2. Edit pub_node.py to make publisher node.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
- Step 3. Test the node
$ cd ~/ros2_ws
$ colcon build --symlink-install
$ source install/setup.bash
ros2 run pub_sub_pkg pub_node
- Result:
$ ros2 run pub_sub_pkg pub_node
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [minimal_publisher]: Publishing: "Hello World: 4"
......................
......................
(2) Subscriber Node
- Step 1. Install ROS2 on the remote PC (e.g., laptop or desktop to see the message topics)
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8
sudo apt update && sudo apt install curl gnupg2 lsb-release
curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -
sudo sh -c 'echo "deb [arch=amd64,arm64] https://packages.ros.org/ros2/ubuntu
lsb_release -cs main" > /etc/apt/sources.list.d/ros2-latest.list'
sudo apt update && sudo apt install -y \
build-essential \
cmake \
git \
python3-colcon-common-extensions \
python3-lark-parser \
python3-pip \
python-rosdep \
python3-vcstool \
wget
# install some pip packages needed for testing
python3 -m pip install -U \
argcomplete \
flake8 \
flake8-blind-except \
flake8-builtins \
flake8-class-newline \
flake8-comprehensions \
flake8-deprecated \
flake8-docstrings \
flake8-import-order \
flake8-quotes \
pytest-repeat \
pytest-rerunfailures \
pytest \
pytest-cov \
pytest-runner \
setuptools
# install Fast-RTPS dependencies
sudo apt install --no-install-recommends -y \
libasio-dev \
libtinyxml2-dev
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
wget https://raw.githubusercontent.com/ros2/ros2/dashing/ros2.repos
vcs import src < ros2.repos
sudo rosdep init
rosdep update
rosdep install --from-paths src --ignore-src --rosdistro dashing -y --skip-keys "console_bridge fastcdr fastrtps libopensplice67 libopensplice69 rti-connext-dds-5.3.1 urdfdom_headers"
cd ~/ros2_ws
colcon build --symlink-install
sudo apt install python3-argcomplete
- Step 2. Create a package for the new node.
$ cd ~/ros2_ws/src
$ ros2 pkg create --build-type ament_python --maintainer-email [email protected] --node-name sub_node pub_sub_pkg
$ cd ~/ros2_ws
$ colcon build --symlink-install
$ source install/setup.bash
or cd ~/ros2_ws && colcon build --symlink-install && source install/setup.bash
- Step 3. Add a subscribe node to the_pub_sub_pkg_ package
$ cd ~ros2_ws/src/pub_sub_pkg/pub_sub_pkg/
$ ls
- Result:
__init__.py pub_node.py __pycache__
- Step 4. Make a new python file (pub_node.py) for subscriber.
$ touch sub_node.py
- Step 5. Edit pub_node.py to make publisher node.
$ nano sub_node.py
or gedit
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
- Step 6. Add an entry point on
setup.py
$ cd ~/ros2_ws/src/pub_sub_pkg
$ ls
- Result:
package.xml pub_sub_pkg/ resource/ setup.cfg setup.py test/
- Step 7. open and modify the
setup.py
nano setup.py
.........................
entry_points={
'console_scripts': [
'pub_node = pub_sub_pkg.pub_node:main'**,**
**'sub_node = pub_sub_pkg.sub_node:main',**
],
},
- Step 8. Test the node
$ cd ~/ros2_ws
$ colcon build --symlink-install
$ source install/setup.bash
or cd ~/ros2_ws && colcon build --symlink-install && source install/setup.bash
- Open another terminal to run the pub_node
#Terminal 1
ros2 run pub_sub_pkg sub_node
#Terminal 2
ros2 run pub_sub_pkg pub_node
- Result:
$ ros2 run pub_sub_pkg pub_node
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [minimal_publisher]: Publishing: "Hello World: 4"
(3) Subscriber and Publisher Node
- it is to create a new publish & subscribe node
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Int32
class Minimal_Subscriber_Publisher(Node):
def __init__(self):
super().__init__('Minimal_Subscriber_Publisher')
# Initialize and define Publishers
self.pub_name = self.create_publisher(Int32, 'wonsu0513/pub_test', 10)
# Setting timer for pulbisher
pub_name_timer_period = 0.5 # seconds
self.pub_name_timer = self.create_timer(pub_name_timer_period,self.pub_name_timer_callback)
self.pub_name_i = 0
# Initialize and define subscription (subscribe)
self.sub_name = self.create_subscription(
Int32,'wonsu0513/pub_test', self.sub_name_callback, 10)
self.sub_name # prevent unused variable warning
def sub_name_callback(self, msg):
self.get_logger().info('(sub) I heard: "%d"' % msg.data)
def pub_name_timer_callback(self):
msg = Int32()
msg.data = self.pub_name_i
self.pub_name.publish(msg)
# display data on the terminal (it is good to test the code)
self.get_logger().info('(pub): "%d"' % msg.data)
self.pub_name_i=self.pub_name_i+1
def main(args=None):
rclpy.init(args=args)
minimal_subscriber_publisher = Minimal_Subscriber_Publisher()
rclpy.spin(minimal_subscriber_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
- Create a new folder for a launch file in the package folder.
mkdir /smart_mbot_ws/src/smartm_bot_pkg/launch
- Create a new python code as the launch file.
import os
from ament_index_python.packages import get_package_share_directory , get_search_paths
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch import LaunchDescription
namespace_name = 'smartmbot_1'
def generate_launch_description():
config = os.path.join(
get_package_share_directory('smart_mbot_pkg'),
'config',
namespace_name+'__Writing_DC_Motor.yaml'
)
return LaunchDescription([
Node(
package='smart_mbot_pkg',
namespace= namespace_name,
executable='Reading_SPI_ADC',
name='Reading_SPI_ADC'
),
Node(
package='smart_mbot_pkg',
namespace= namespace_name,
executable='Writing_DC_Motor',
name='Writing_DC_Motor',
output='screen',
parameters = [config]
),
Node(
package='smart_mbot_pkg',
namespace= namespace_name,
executable='Reading_I2C_ToF',
name='Reading_I2C_ToF'
),
Node(
package='smart_mbot_pkg',
namespace=namespace_name,
executable='Writing_WB2813b_RGB_STRIP',
name='Writing_WB2813b_RGB_STRIP'
),
Node(
package='smart_mbot_pkg',
namespace=namespace_name,
executable='Writing_GPIO_SMD5050_LED',
name='Writing_GPIO_SMD5050_LED'
),
Node(
package='v4l2_camera',
namespace=namespace_name,
executable='v4l2_camera_node',
name='v4l2_camera_node'
)
])
- Lastly, modify the
setup.py
in the working space to detect the launch file in the launch folder.
import os
from glob import glob
from setuptools import setup
package_name = 'smart_mbot_pkg'
setup(
name=package_name,
version='0.0.1',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml'))
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Wonse Jo and Jaeeun Kim',
maintainer_email='[email protected]',
description='This package is a fundamental pakage for SMARTmBOT. If you need more information, please visit our site; https://github.itap.purdue.edu/ByungcheolMinGroup/smart_mbot_ws',
license='GNU General Public License v3.0: gpl-3.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'Reading_SPI_ADC = smart_mbot_pkg.ros2_spi_adc_reading:main',
'Writing_DC_Motor = smart_mbot_pkg.ros2_dc_motor_control:main',
'Reading_I2C_ToF = smart_mbot_pkg.ros2_i2c_tof_reading:main',
'Writing_WB2813b_RGB_STRIP = smart_mbot_pkg.ros2_rgb_strip_ws2812b:main',
'Writing_GPIO_SMD5050_LED = smart_mbot_pkg.ros2_gpio_rgb_led:main',
],
},
)