Skip to content

1. Basic ROS2 Example

SMART Lab at Purdue University edited this page May 27, 2021 · 20 revisions

Introduction

In this page, we introduce the basic ROS2 functions to understand the ROS2 system and how to make publisher and subscriber nodes. Based on the official ROS2 website, we generated the templates to build the publisher and subscriber, and both nodes. If you are a beginner to ROS2/ROS, you may want to copy and paste the template, and try to understand how to make the nodes in ROS2.

If you want to see the official documents, please visit this link: https://index.ros.org/doc/ros2/Tutorials/Writing-A-Simple-Py-Publisher-And-Subscriber/.

The contents of this page are as follows:

  1. Creating a ROS2 Package (python)

  2. Publisher & Subscriber Nodes

  3. Launch File

You can also read or download a PDF version of the SMARTmBOT Guide/Manual, and watch a full demo video of the SMARTmBOT at https://youtu.be/cn3vcqFgf90.

1. Creating a ROS2 Package (python)

  • Step 1. Create a workspace for ROS2.

$ mkdir -p ~/ros2_ws/src && cd ~/ros2_ws/src

$ cd ~/ros2_ws/

$ echo 'source /opt/ros/dashing/setup.bash' >> ~/.bashrc

$ echo 'source ~/ros2_ws/install/setup.bash' >> ~/.bashrc

$ colcon build --symlink-install

$ source install/setup.bash

  • Step 2. Create a package having a node for a publisher and subscriber

$ cd ~/ros2_ws/src

$ ros2 pkg create --build-type ament_python --maintainer-email [email protected] --node-name pub_node pub_sub_pkg

$ cd ~/ros2_ws

$ colcon build --symlink-install

$ source install/setup.bash

  • Step 3. Test the publisher node

$ ros2 run pub_sub_pkg pub_node

  • Result:
Hi from pub_sub_pkg.

2. Creating Publisher & Subscriber Node

(1) Publisher Node

  • Step 1. Go to the package folder, open pub_nano.py

$ cd ~/ros2_ws/src/pub_sub_pkg/pub_sub_pkg

$ nano pub_node.py

  • Step 2. Edit pub_node.py to make publisher node.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MinimalPublisher(Node):
    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0
    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

def main(args=None):
    rclpy.init(args=args)
    minimal_publisher = MinimalPublisher()
    rclpy.spin(minimal_publisher)
    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  • Step 3. Test the node

$ cd ~/ros2_ws

$ colcon build --symlink-install

$ source install/setup.bash

ros2 run pub_sub_pkg pub_node

  • Result:
$ ros2 run pub_sub_pkg pub_node 
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [minimal_publisher]: Publishing: "Hello World: 4"
......................
......................

(2) Subscriber Node

  • Step 1. Install ROS2 on the remote PC (e.g., laptop or desktop to see the message topics)

sudo locale-gen en_US en_US.UTF-8

sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8

export LANG=en_US.UTF-8

sudo apt update && sudo apt install curl gnupg2 lsb-release

curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -

sudo sh -c 'echo "deb [arch=amd64,arm64] https://packages.ros.org/ros2/ubuntu lsb_release -cs main" > /etc/apt/sources.list.d/ros2-latest.list'

sudo apt update && sudo apt install -y \
  build-essential \
  cmake \
  git \
  python3-colcon-common-extensions \
  python3-lark-parser \
  python3-pip \
  python-rosdep \
  python3-vcstool \
  wget
# install some pip packages needed for testing
python3 -m pip install -U \
  argcomplete \
  flake8 \
  flake8-blind-except \
  flake8-builtins \
  flake8-class-newline \
  flake8-comprehensions \
  flake8-deprecated \
  flake8-docstrings \
  flake8-import-order \
  flake8-quotes \
  pytest-repeat \
  pytest-rerunfailures \
  pytest \
  pytest-cov \
  pytest-runner \
  setuptools
# install Fast-RTPS dependencies
sudo apt install --no-install-recommends -y \
  libasio-dev \
  libtinyxml2-dev

mkdir -p ~/ros2_ws/src

cd ~/ros2_ws

wget https://raw.githubusercontent.com/ros2/ros2/dashing/ros2.repos

vcs import src < ros2.repos

sudo rosdep init

rosdep update

rosdep install --from-paths src --ignore-src --rosdistro dashing -y --skip-keys "console_bridge fastcdr fastrtps libopensplice67 libopensplice69 rti-connext-dds-5.3.1 urdfdom_headers"

cd ~/ros2_ws

colcon build --symlink-install

sudo apt install python3-argcomplete

  • Step 2. Create a package for the new node. $ cd ~/ros2_ws/src

$ ros2 pkg create --build-type ament_python --maintainer-email [email protected] --node-name sub_node pub_sub_pkg

$ cd ~/ros2_ws

$ colcon build --symlink-install

$ source install/setup.bash

or cd ~/ros2_ws && colcon build --symlink-install && source install/setup.bash

  • Step 3. Add a subscribe node to the_pub_sub_pkg_ package $ cd ~ros2_ws/src/pub_sub_pkg/pub_sub_pkg/ $ ls
  • Result:
__init__.py  pub_node.py  __pycache__
  • Step 4. Make a new python file (pub_node.py) for subscriber.

$ touch sub_node.py

  • Step 5. Edit pub_node.py to make publisher node.

$ nano sub_node.py or gedit

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • Step 6. Add an entry point on setup.py

$ cd ~/ros2_ws/src/pub_sub_pkg

$ ls

  • Result:
package.xml  pub_sub_pkg/ resource/    setup.cfg    setup.py     test/
  • Step 7. open and modify the setup.py

nano setup.py

.........................
entry_points={
        'console_scripts': [
            'pub_node = pub_sub_pkg.pub_node:main'**,**
            **'sub_node = pub_sub_pkg.sub_node:main',**
        ],
    },
  • Step 8. Test the node

$ cd ~/ros2_ws

$ colcon build --symlink-install

$ source install/setup.bash

or cd ~/ros2_ws && colcon build --symlink-install && source install/setup.bash

  • Open another terminal to run the pub_node

#Terminal 1

ros2 run pub_sub_pkg sub_node

#Terminal 2

ros2 run pub_sub_pkg pub_node

  • Result:
$ ros2 run pub_sub_pkg pub_node
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [minimal_publisher]: Publishing: "Hello World: 4"

(3) Subscriber and Publisher Node

  • it is to create a new publish & subscribe node
import rclpy
from rclpy.node import Node

from std_msgs.msg import String, Int32


class Minimal_Subscriber_Publisher(Node):
    def __init__(self):
        super().__init__('Minimal_Subscriber_Publisher')
        # Initialize and define Publishers
        self.pub_name = self.create_publisher(Int32, 'wonsu0513/pub_test', 10)

        # Setting timer for pulbisher
        pub_name_timer_period = 0.5  # seconds
        self.pub_name_timer = self.create_timer(pub_name_timer_period,self.pub_name_timer_callback)
        self.pub_name_i = 0

        # Initialize and define subscription (subscribe)
        self.sub_name = self.create_subscription(
            Int32,'wonsu0513/pub_test', self.sub_name_callback, 10)
        self.sub_name  # prevent unused variable warning

    def sub_name_callback(self, msg):
        self.get_logger().info('(sub) I heard: "%d"' % msg.data)

    def pub_name_timer_callback(self):
        msg = Int32()
        msg.data = self.pub_name_i
        self.pub_name.publish(msg)
        # display data on the terminal (it is good to test the code)
        self.get_logger().info('(pub): "%d"' % msg.data)
        self.pub_name_i=self.pub_name_i+1


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber_publisher = Minimal_Subscriber_Publisher()

    rclpy.spin(minimal_subscriber_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3. Launch File

  • Create a new folder for a launch file in the package folder.

mkdir /smart_mbot_ws/src/smartm_bot_pkg/launch

  • Create a new python code as the launch file.
import os
from ament_index_python.packages import get_package_share_directory , get_search_paths
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch import LaunchDescription


namespace_name = 'smartmbot_1'

def generate_launch_description():
    config = os.path.join(
        get_package_share_directory('smart_mbot_pkg'),
         'config',
          namespace_name+'__Writing_DC_Motor.yaml'
        )

    return LaunchDescription([
        Node(
            package='smart_mbot_pkg',
            namespace= namespace_name,
            executable='Reading_SPI_ADC',
            name='Reading_SPI_ADC'
        ),

        Node(
            package='smart_mbot_pkg',
            namespace= namespace_name,
            executable='Writing_DC_Motor',
            name='Writing_DC_Motor',
            output='screen',
            parameters = [config]
        ),

        Node(
            package='smart_mbot_pkg',
            namespace= namespace_name,
            executable='Reading_I2C_ToF',
            name='Reading_I2C_ToF'
        ),

        Node(
            package='smart_mbot_pkg',
            namespace=namespace_name,
            executable='Writing_WB2813b_RGB_STRIP',
            name='Writing_WB2813b_RGB_STRIP'
        ),

        Node(
            package='smart_mbot_pkg',
            namespace=namespace_name,
            executable='Writing_GPIO_SMD5050_LED',
            name='Writing_GPIO_SMD5050_LED'
        ),

        Node(
            package='v4l2_camera',
            namespace=namespace_name,
            executable='v4l2_camera_node',
            name='v4l2_camera_node'
        )
    ])
  • Lastly, modify the setup.py in the working space to detect the launch file in the launch folder.
import os
from glob import glob
from setuptools import setup

package_name = 'smart_mbot_pkg'

setup(
    name=package_name,
    version='0.0.1',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
        (os.path.join('share', package_name, 'config'), glob('config/*.yaml'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='Wonse Jo and Jaeeun Kim',
    maintainer_email='[email protected]',
    description='This package is a fundamental pakage for SMARTmBOT. If you need more information, please visit our site; https://github.itap.purdue.edu/ByungcheolMinGroup/smart_mbot_ws',
    license='GNU General Public License v3.0: gpl-3.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'Reading_SPI_ADC = smart_mbot_pkg.ros2_spi_adc_reading:main',
            'Writing_DC_Motor = smart_mbot_pkg.ros2_dc_motor_control:main',
            'Reading_I2C_ToF = smart_mbot_pkg.ros2_i2c_tof_reading:main',
            'Writing_WB2813b_RGB_STRIP = smart_mbot_pkg.ros2_rgb_strip_ws2812b:main',
            'Writing_GPIO_SMD5050_LED = smart_mbot_pkg.ros2_gpio_rgb_led:main',
        ],
    },
)