로보테크AI

융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/29

steezer 2026. 4. 29. 18:30

 

PM10 기준으로 0~30으로 히트맵 표현

 

프로젝트 구조

  group4_ws/
  ├── src/
  │   ├── air_clean_robot/           # 메인 공기청정 제어 패키지
  │   │   ├── air_clean_robot/      # Python 소스 코드
  │   │   ├── launch/                # Launch 파일들
  │   │   └── config/                # 설정(YAML) 파일
  │   └── dust_mapping/              # 먼지 매핑 시각화 패키지
  │       ├── dust_mapping/         # Python 소스 코드
  │       ├── launch/
  │       └── config/
  ├── build/                         # 컴파일된 결과물 (자동 생성)
  ├── install/                       # 설치된 패키지 (자동 생성)
  └── log/                           # 빌드 로그 (자동 생성)

 

시스템 개요

 

 1. air_clean_robot 패키지
  공기질 기반 자동 탐색 및 무드 매니지먼트 시스템

  2. dust_mapping 패키지
  실시간 미세먼지 히트맵 매핑 및 시각화 시스템

 

 

핵심 컴포넌트 상세 분석

  air_clean_robot 패키지

  1. Serial Air Quality Node (serial_air_quality_node.py)

  - 목적: 아두이노 시리얼 포트로부터 PMS7003 센서 데이터 읽기
  - 입력: USB Serial (/dev/ttyACM0) - PM1.0, PM2.5, PM10 데이터
  - 출력: /air_quality (JSON 형식 String 메시지)
  - 주요 매개변수:
    - serial_port: "/dev/ttyACM0"
    - baudrate: 9600
    - zone_id: "A"
    - warmup_sec: 10.0 (초기 안정화 시간)
    - reconnect_sec: 3.0 (재연결 주기)
  - 데이터 형식:
  {"zone": "A", "pm1_0": 12, "pm2_5": 35, "pm10": 50, "source": "serial"}
  - 지원 포맷: PM1.0:12,PM2.5:35, 12,35,50, PM1.0=12

  2. Sim Air Quality Publisher (sim_air_quality_publisher)

  - 목적: 실 센서 없이 테스트할 수 있는 시뮬레이션 노드
  - 출력: /air_quality (dirty/normal 모드)
  - 모드: dirty (PM2.5=45) 또는 normal (PM2.5=12)

  3. Air Quality Task Manager (air_quality_task_manager.py)

  - 핵심: FSM(Finite State Machine) 기반 자동 제어 시스템
  - 상태 전이: IDLE → GO_TO_DIRTY_ZONE → CLEANING → RETURN_HOME → IDLE

  동작 원리:
  1. /air_quality 토픽 구독
  2. 이동평균 필터로 PM2.5 추적 (window_size: 5개 샘플)
  3. 임계값 초과 시(pm25_high_threshold: 35μg/m³) 오염 지역으로 이동
  4. 도착 후 cleaning_duration_sec: 30초간 정화 수행
  5. 정화 완료 후 대기 위치(home)로 복귀

  주요 매개변수:
  pm25_high_threshold: 35.0    # 오염 판정 기준
  pm25_low_threshold: 25.0      # 정화 완료 기준
  cleaning_duration_sec: 30.0   # 정화 소요 시간
  use_nav2: true                # 실제 네비게이션 사용 여부

  4. Manual Nav Controller (manual_nav_controller.py)

  - 목적: 수동 네비게이션 명령 처리
  - 입력: /air_clean_command (String)
  - 지원 명령: "node1", "node2", "node3", "home"
  - 기능: 사전 정의된 위치로 Nav2 목표 전송

  5. A Path Follower (astar_follower.py)*

  - 목적: Nav2 대신 커스텀 A* 경로 계획 및 주행 제어
  - 특징:
    - 실시간 OccupancyGrid 기반 경로 재계획
    - Pure Pursuit 알고리즘 기반 주행
    - 라이다 기반 동적 장애물 회피
    - 정적 맵 장애물 인플레이션
  - 주요 토픽:
    - /map, /amcl_pose, /goal_pose
    - /cmd_vel (출력 제어 명령)
    - /astar_path (시각화용 경로)

  6. Air Clean GUI (air_clean_gui.py)

  - 목적: Tkinter 기반 GUI 제어 인터페이스
  - 기능:
    - 실시간 카메라 영상 표시
    - 지도 및 로봇 위치 시각화
    - 수동 네비게이션 버튼
    - 공기질 실시간 모니터링

  dust_mapping 패키지

  7. MQTT PM Listener (mqtt_pm_listener.py)

  - 목적: MQTT 브로커로부터 실시간 PM 센서 데이터 수신
  - 지원 백엔드: paho-mqtt 또는 mosquitto_sub CLI
  - 입력: MQTT 토픽 (sensor/pm/zoneA/#)
  - 출력: /dust/pm (JSON)

  8. Dust Mapper (dust_mapper.py)

  - 목적: 로봇 위치 + PM 데이터를 결합하여 맵 셀 단위 평균 계산
  - 기능:
    - cell_size_m: 0.10m 격자 맵 생성
    - CSV로 데이터 로깅 (dust_logs/dust_samples.csv)
    - 평균/최대/카운트 저장

  9. Dust Map Viewer (dust_map_viewer.py)

  - 목적: Tkinter 기준 실시간 미세먼지 히트맵 시각화
  - 색상 스케일:
    - 0-5 μg/m³: 파란색 (안전)
    - 5-10 μg/m³: 청록색
    - 10-15 μg/m³: 초록색
    - 15-20 μg/m³: 노란색
    - 20-25 μg/m³: 주황색
    - 25-30 μg/m³: 빨간색
    - 30+ μg/m³: 어두운 빨간색 (위험)

 


  주요 ROS2 토픽 흐름

AIR QUALITY CHAIN
  PMS7003 센서 → Arduino → serial_node → /air_quality → task_manager → Nav2

DUST MAPPING CHAIN
  MQTT Broker → mqtt_listener → /dust/pm → dust_mapper → /dust/cells →
  dust_viewer


설정 파일 (air_clean_params.yaml)

  공통 위치 설정 (3개 노드 공유)

  node_1: (2.30, -0.01)   # 오염 지역 1
  node_2: (2.35, -2.70)   # 오염 지역 2
  node_3: (-0.09, -2.53)  # 오염 지역 3
  home:   (0.04, -0.04)   # 대기 위치

  A* 제어 파라미터

  linear_speed: 0.08 m/s          # 선속도
  max_angular_speed: 0.45 rad/s   # 최대 각속도
  goal_tolerance: 0.12 m         # 목표 도착 허용 오차
  inflation_radius_cells: 5      # 장애물 인플레이션 (0.25m)
  use_scan_obstacles: true       # 동적 장애물 감지 활성화


실행 방법

  1. 실 센서 모드
  ros2 launch air_clean_robot air_clean_serial.launch.py

  2. 시뮬레이션 모드
  ros2 launch air_clean_robot air_clean_sihttp://m.launch.py

  3. 먼지 매핑 모드
  ros2 launch dust_mapping dust_map.launch.py


의존성 분석

  air_clean_robot 패키지 의존성

  - ROS2: rclpy, std_msgs, geometry_msgs, nav_msgs, sensor_msgs, nav2_msgs,
  action_msgs
  - Python: pyserial, tkinter, Pillow
  - 하드웨어: PMS7003 센서, Arduino, TurtleBot3

  dust_mapping 패키지 의존성

  - ROS2: rclpy, std_msgs, geometry_msgs, nav_msgs
  - Python: numpy, PIL, paho-mqtt/mosquitto
  - 외부: MQTT Broker (192.168.0.55:1883)


핵심 특징

  1. 이중 시스템: 실시간 제어(air_clean_robot) + 데이터 시각화(dust_mapping)
   병렬 운영
  2. 유연성: 시리얼/MQTT 중복 지원, Nav2/Custom A* 전환 가능
  3. 안정성: 이동평균 필터링, 재연결 로직, FSM 기반 안전 제어
  4. 확장성: 새로운 오염 지역 추가 용이, 파라미터 기반 설정

 

요약

ROS2 기반 스마트 공기청정 로봇:
  - 입력: PMS7003 미세먼지 센서 (시리얼 또는 MQTT)
  - 처리: 이동평균 필터링, FSM 기반 자동 결정
  - 제어: Nav2 또는 A* 커스텀 네비게이션
  - 출력: 실시간 히트맵 시각화, CSV 로깅, GUI 제어 인터페이스

  전체 시스템은 독립적으로 실행 가능한 두 패키지로 구성되어 있으며, 공통된
  지도 프레임워크를 기반으로 실내 공기질 모니터링 및 자동 정화를 수행


1. 센서 데이터 수집: Serial Air Quality Node

#!/usr/bin/env python3
  import json
  import re
  import time
  import rclpy
  from rclpy.node import Node
  from std_msgs.msg import String
  import serial

  class SerialAirQualityNode(Node):
      def __init__(self):
          super().__init__('serial_air_quality_node')

          # 매개변수 선언
          self.declare_parameter('serial_port', '/dev/ttyACM0')
          self.declare_parameter('baudrate', 9600)
          self.declare_parameter('zone_id', 'A')
          self.declare_parameter('publish_topic', '/air_quality')
          self.declare_parameter('warmup_sec', 10.0)

          # 시리얼 설정
          self.serial_port = self.get_parameter('serial_port').value
          self.baudrate = int(self.get_parameter('baudrate').value)
          self.publisher = self.create_publisher(String, '/air_quality', 10)

          # 0.1초마다 콜백 실행
          self.timer = self.create_timer(0.1, self.timer_callback)

      def timer_callback(self):
          try:
              raw = self.serial_conn.readline()
              line = raw.decode('utf-8', errors='ignore').strip()

              # PM1.0:12,PM2.5:35,PM10:50 형식 파싱
              values = self.parse_line(line)
              if len(values) == 3:
                  payload = {
                      'zone': self.zone_id,
                      'pm1_0': values['pm1_0'],
                      'pm2_5': values['pm2_5'],
                      'pm10': values['pm10'],
                      'source': 'serial'
                  }
                  msg = String()
                  msg.data = json.dumps(payload)
                  self.publisher.publish(msg)

          except serial.SerialException:
              self.serial_conn = None

  def main(args=None):
      rclpy.init(args=args)
      node = SerialAirQualityNode()
      try:
          rclpy.spin(node)
      finally:
          node.destroy_node()
          rclpy.shutdown()

  if __name__ == '__main__':
      main()

 

2. 자동 제어 FSM: Air Quality Task Manager

#!/usr/bin/env python3
  import json
  from enum import Enum
  from collections import defaultdict, deque
  import rclpy
  from rclpy.node import Node
  from std_msgs.msg import String
  from nav2_msgs.action import NavigateToPose
  from rclpy.action import ActionClient
  from action_msgs.msg import GoalStatus

  class State(Enum):
      IDLE = 'IDLE'
      GO_TO_DIRTY_ZONE = 'GO_TO_DIRTY_ZONE'
      CLEANING = 'CLEANING'
      RETURN_HOME = 'RETURN_HOME'
      ERROR = 'ERROR'

  class AirQualityTaskManager(Node):
      def __init__(self):
          super().__init__('air_quality_task_manager')

          # 매개변수 선언
          self.declare_parameter('pm25_high_threshold', 35.0)
          self.declare_parameter('pm25_low_threshold', 25.0)
          self.declare_parameter('window_size', 5)

          # PM2.5 임계값 설정
          self.pm25_high_threshold = 35.0  # 오염 감지 기준
          self.pm25_low_threshold = 25.0   # 정화 완료 기준
          self.window_size = 5  # 이동평균 창 크기

          # 상태 저장소
          self.pm25_windows = defaultdict(lambda: deque(maxlen=5))
          self.state = State.IDLE
          self.goal_in_progress = False

          # 구독 및 클라이언트
          self.subscription = self.create_subscription(
              String, '/air_quality', self.air_quality_callback, 10)
          self.nav_client = ActionClient(self, NavigateToPose,
  'navigate_to_pose')

          # 1초마다 FSM 실행
          self.timer = self.create_timer(1.0, self.fsm_tick)

      def air_quality_callback(self, msg):
          # JSON 파싱: {"zone": "A", "pm2_5": 35, ...}
          payload = json.loads(msg.data)
          pm25 = float(payload['pm2_5'])

          # 이동평균 업데이트
          self.pm25_windows['A'].append(pm25)

      def fsm_tick(self):
          if self.state == State.IDLE:
              self.handle_idle()
          elif self.state == State.CLEANING:
              self.handle_cleaning()

      def handle_idle(self):
          # PM2.5 평균 계산
          avg = sum(self.pm25_windows['A']) / len(self.pm25_windows['A'])

          # 오염 감지 → 오염 지역으로 이동
          if avg >= self.pm25_high_threshold:
              self.send_navigation_goal(
                  x=1.0, y=0.0, yaw=0.0,  # 오염 지역 좌표
                  on_success_state=State.CLEANING
              )
              self.state = State.GO_TO_DIRTY_ZONE

      def handle_cleaning(self):
          # 30초간 정화 수행
          elapsed = time.monotonic() - self.cleaning_start_time
          if elapsed >= 30.0:
              # 정화 완료 → home으로 복귀
              self.send_navigation_goal(
                  x=0.0, y=0.0, yaw=0.0,  # home 좌표
                  on_success_state=State.IDLE
              )
              self.state = State.RETURN_HOME

  def main(args=None):
      rclpy.init(args=args)
      node = AirQualityTaskManager()
      try:
          rclpy.spin(node)
      finally:
          node.destroy_node()
          rclpy.shutdown()

 

 3. 경로 계획 & 주행: A Follower*

#!/usr/bin/env python3
  import heapq
  import math
  import numpy as np
  import rclpy
  from rclpy.node import Node
  from nav_msgs.msg import OccupancyGrid, Path
  from geometry_msgs.msg import PoseWithCovarianceStamped, Twist
  from std_msgs.msg import Bool

  def run_astar(map_data, start, goal, width, height):
      """A* 경로 계획 알고리즘"""
      start_node = (0, start)  # (f-cost, position)
      open_list = [start_node]
      best_cost = {start: 0}

      # 8방향 이동 (상하좌우 + 대각선)
      moves = [(0,1), (1,0), (0,-1), (-1,0), (1,1), (1,-1), (-1,1), (-1,-1)]

      while open_list:
          f_cost, current = heapq.heappop(open_list)

          if current == goal:
              # 경로 재구성
              path = []
              while current in came_from:
                  path.append(current)
                  current = came_from[current]
              return list(reversed(path))

          for dy, dx in moves:
              next_pos = (current[0] + dy, current[1] + dx)
              if 0 <= next_pos[0] < height and 0 <= next_pos[1] < width:
                  if map_data[next_pos[0], next_pos[1]] == 0:  # 빈 공간
                      new_cost = best_cost[current] + 1
                      if new_cost < best_cost.get(next_pos, float('inf')):
                          best_cost[next_pos] = new_cost
                          f_cost = new_cost +
  math.hypot(next_pos[0]-goal[0], next_pos[1]-goal[1])
                          heapq.heappush(open_list, (f_cost, next_pos))
                          came_from[next_pos] = current

  class AStarFollower(Node):
      def __init__(self):
          super().__init__('astar_follower')

          # 구독
          self.create_subscription(OccupancyGrid, '/map', self.map_callback,
   10)
          self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose',
  self.pose_callback, 10)
          self.create_subscription(Bool, '/shoe_detected',
  self.shoe_callback, 10)

          # 발행
          self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)

          # 제어 루프 (10Hz)
          self.create_timer(0.1, self.control_loop)

          self.path = []
          self.current_pose = None

      def control_loop(self):
          if not self.path or not self.current_pose:
              return

          # Pure Pursuit: 목표점 추종
          target = self.path[min(3, len(self.path)-1)]  # lookahead
          dx = target[0] - self.current_pose[0]
          dy = target[1] - self.current_pose[1]

          # 각도 계산
          angle = math.atan2(dy, dx)
          twist = Twist()
          twist.linear.x = 0.1  # 전진 속도
          twist.angular.z = angle * 1.5  # 비례 제어
          self.cmd_pub.publish(twist)

  def main(args=None):
      rclpy.init(args=args)
      node = AStarFollower()
      rclpy.spin(node)

  if __name__ == '__main__':
      main()

 

4. MQTT 데이터 수신: MQTT PM Listener

#!/usr/bin/env python3
  import json
  import rclpy
  from rclpy.node import Node
  from std_msgs.msg import String
  import paho.mqtt.client as mqtt

  class MqttPmListener(Node):
      def __init__(self):
          super().__init__('mqtt_pm_listener')

          # MQTT 브로커 설정
          self.declare_parameter('broker_host', '192.168.0.55')
          self.declare_parameter('mqtt_topic', 'sensor/pm/zoneA/#')
          self.declare_parameter('output_topic', '/dust/pm')

          self.broker = '192.168.0.55'
          self.topic = 'sensor/pm/zoneA/#'

          # ROS 퍼블리셔
          self.pub = self.create_publisher(String, '/dust/pm', 10)

          # MQTT 클라이언트
          self.client = mqtt.Client()
          self.client.on_connect = self.on_connect
          self.client.on_message = self.on_message

          self.client.connect(self.broker, 1883)
          self.client.loop_start()

      def on_connect(self, client, userdata, flags, rc):
          self.client.subscribe(self.topic)
          self.get_logger().info('MQTT 연결 완료')

      def on_message(self, client, userdata, msg):
          # MQTT 메시지를 ROS 메시지로 변환
          payload = json.loads(msg.payload)
          payload['mqtt_topic'] = msg.topic
          payload['source'] = 'mqtt'

          ros_msg = String()
          ros_msg.data = json.dumps(payload)
          self.pub.publish(ros_msg)

  def main(args=None):
      rclpy.init(args=args)
      node = MqttPmListener()
      rclpy.spin(node)

  if __name__ == '__main__':
      main()

 

핵심 코드 요약

노드 역할 핵심 로직 비고
serial_air_quality_node 센서 데이터 수집 시리얼 파싱, JSON 변환 실제 하드웨어 연결
air_quality_task_manager 자동 제어 FSM + 이동평균 필터 PM2.5 기준으로 자동 이동
astar_follower 경로 계획/ 주행 A* + Pure Pursuit Nav2 대체 커스텀 네비게이션
mqtt_pm_listener MQTT 데이터 수신 paho-mqtt 구독 무선 센서 데이터 연동


  이 4개 노드가 전체 시스템의 핵심 데이터 흐름과 의사결정을 담당

 

 

 

 

시간 여유로 아두이노 + 라즈베리 파이 연결 실험

 

아래는 기초적 메뉴얼

(나중에 심화 프로젝트 시간 남거나 최종 때 사용 대비)


Arduino-ROS2 범용 브릿지 시스템 구축 매뉴얼

문서 개요

본 문서는 라즈베리파이(ROS2 Humble) 환경에서 아두이노 우노를 USB로 연결하여 다양한 센서 및 액추에이터(LED, 부저, 서보 등)를 범용적으로 제어하기 위한 시스템 구축 절차를 기술한다. 모듈 추가 시 아두이노 펌웨어를 재작성하지 않고 ROS2 설정 파일(YAML) 수정만으로 확장 가능한 구조를 목표로 한다.

 

1. 시스템 개요

1.1 구성

구성 요소 역할
아두이노 우노 핀 단위 입출력을 수행하는 범용 실행기. 센서 데이터를 주기 발행
라즈베리파이 ROS2 노드를 통해 아두이노에 명령을 전달하고 데이터를 수신
USB 케이블 양방향 시리얼 통신 채널. 보드레이트 115200 사용

1.2 설계 원칙

아두이노는 "핀 번호와 동작을 알려주면 그대로 수행하는 기계"로 동작한다. 모듈별 의미(LED, 부저 등)는 라즈베리파이 측 YAML 설정 파일에서만 정의한다. 따라서 모듈 추가/변경 시 아두이노 펌웨어는 수정하지 않는다.

 

1.3 통신 프로토콜

JSON Lines 방식을 사용한다. 한 줄에 하나의 JSON 객체, 끝에 줄바꿈(\n).

라즈베리 -> 아두이노 (명령)

cmd 파라미터 용도
pin_mode pin, mode 핀 모드 설정 (out/in/in_pullup)
digital pin, value digitalWrite (0 또는 1)
pwm pin, value analogWrite (0~255)
tone pin, freq, dur 부저 음 출력 (Hz, ms)
no_tone pin 부저 끄기
servo pin, angle 서보 각도 (0~180)
servo_detach pin 서보 해제
ping - 연결 확인

아두이노 -> 라즈베리 (응답/데이터)

type 내용
sensor 주기적 센서 데이터
ack 명령 수신 확인
error 잘못된 명령에 대한 오류 메시지
pong ping에 대한 응답

 

2. 사전 준비

2.1 하드웨어

  • 아두이노 우노 1대
  • USB-B 케이블 1개
  • 라즈베리파이 (TurtleBot3 탑재 기준, Ubuntu + ROS2 Humble 설치 완료 상태)
  • 사용할 센서/액추에이터 (예: HC-SR04, DHT11, LED, 부저, SG90 서보 등)

2.2 소프트웨어 (라즈베리파이)

아래 명령을 라즈베리파이 터미널에서 순서대로 실행한다.

sudo apt update
sudo apt install python3-serial
sudo usermod -a -G dialout $USER

 

usermod 실행 후에는 반드시 재로그인 또는 재부팅한다. 재로그인 없이는 시리얼 포트 접근 권한이 적용되지 않는다.

2.3 소프트웨어 (아두이노 IDE)

Arduino IDE의 라이브러리 매니저(Tools -> Manage Libraries)에서 다음 라이브러리를 설치한다.

  • ArduinoJson (Benoit Blanchon)
  • DHT sensor library (Adafruit) — DHT11/DHT22 사용 시
  • Servo (기본 내장, 별도 설치 불필요)

3. 아두이노 펌웨어 업로드

3.1 스케치 작성

Arduino IDE에서 새 스케치를 생성하고, 아래 코드 전체를 붙여넣는다.

#include <ArduinoJson.h>
#include <Servo.h>
#include <DHT.h>

// ==================== 사용자 설정 영역 ====================
#define DHT_PIN 4
#define DHT_TYPE DHT11
#define ULTRASONIC_TRIG 9
#define ULTRASONIC_ECHO 10

const unsigned long SENSOR_INTERVAL_MS = 100;

// ==================== 내부 상태 ====================
DHT dht(DHT_PIN, DHT_TYPE);
unsigned long lastSensorPublish = 0;

const uint8_t MAX_SERVOS = 4;
struct ServoSlot {
  uint8_t pin;
  Servo servo;
  bool used;
};
ServoSlot servos[MAX_SERVOS];

// ==================== 유틸 ====================
void sendAck(const char* cmd, int pin) {
  StaticJsonDocument<128> doc;
  doc["type"] = "ack";
  doc["cmd"] = cmd;
  if (pin >= 0) doc["pin"] = pin;
  serializeJson(doc, Serial);
  Serial.println();
}

void sendError(const char* msg, int pin = -1) {
  StaticJsonDocument<128> doc;
  doc["type"] = "error";
  doc["msg"] = msg;
  if (pin >= 0) doc["pin"] = pin;
  serializeJson(doc, Serial);
  Serial.println();
}

// ==================== 서보 슬롯 관리 ====================
Servo* getOrAttachServo(uint8_t pin) {
  for (uint8_t i = 0; i < MAX_SERVOS; i++) {
    if (servos[i].used && servos[i].pin == pin) return &servos[i].servo;
  }
  for (uint8_t i = 0; i < MAX_SERVOS; i++) {
    if (!servos[i].used) {
      servos[i].pin = pin;
      servos[i].servo.attach(pin);
      servos[i].used = true;
      return &servos[i].servo;
    }
  }
  return nullptr;
}

void detachServo(uint8_t pin) {
  for (uint8_t i = 0; i < MAX_SERVOS; i++) {
    if (servos[i].used && servos[i].pin == pin) {
      servos[i].servo.detach();
      servos[i].used = false;
      return;
    }
  }
}

// ==================== 명령 처리 ====================
void handleCommand(const String& line) {
  StaticJsonDocument<200> doc;
  if (deserializeJson(doc, line)) {
    sendError("bad json");
    return;
  }

  const char* cmd = doc["cmd"];
  if (!cmd) {
    sendError("no cmd");
    return;
  }

  if (strcmp(cmd, "pin_mode") == 0) {
    int pin = doc["pin"] | -1;
    const char* mode = doc["mode"] | "";
    if (pin < 0) { sendError("invalid pin", pin); return; }
    if (strcmp(mode, "out") == 0) pinMode(pin, OUTPUT);
    else if (strcmp(mode, "in") == 0) pinMode(pin, INPUT);
    else if (strcmp(mode, "in_pullup") == 0) pinMode(pin, INPUT_PULLUP);
    else { sendError("invalid mode", pin); return; }
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "digital") == 0) {
    int pin = doc["pin"] | -1;
    int value = doc["value"] | 0;
    if (pin < 0) { sendError("invalid pin", pin); return; }
    digitalWrite(pin, value ? HIGH : LOW);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "pwm") == 0) {
    int pin = doc["pin"] | -1;
    int value = doc["value"] | 0;
    if (pin < 0) { sendError("invalid pin", pin); return; }
    value = constrain(value, 0, 255);
    analogWrite(pin, value);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "tone") == 0) {
    int pin = doc["pin"] | -1;
    int freq = doc["freq"] | 0;
    int dur = doc["dur"] | 0;
    if (pin < 0 || freq <= 0) { sendError("invalid tone", pin); return; }
    if (dur > 0) tone(pin, freq, dur);
    else tone(pin, freq);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "no_tone") == 0) {
    int pin = doc["pin"] | -1;
    if (pin < 0) { sendError("invalid pin", pin); return; }
    noTone(pin);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "servo") == 0) {
    int pin = doc["pin"] | -1;
    int angle = doc["angle"] | -1;
    if (pin < 0 || angle < 0 || angle > 180) {
      sendError("invalid servo", pin);
      return;
    }
    Servo* s = getOrAttachServo(pin);
    if (!s) { sendError("no servo slot", pin); return; }
    s->write(angle);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "servo_detach") == 0) {
    int pin = doc["pin"] | -1;
    if (pin < 0) { sendError("invalid pin", pin); return; }
    detachServo(pin);
    sendAck(cmd, pin);
  }
  else if (strcmp(cmd, "ping") == 0) {
    StaticJsonDocument<64> r;
    r["type"] = "pong";
    serializeJson(r, Serial);
    Serial.println();
  }
  else {
    sendError("unknown cmd");
  }
}

// ==================== 센서 읽기 ====================
float readUltrasonicCm() {
  digitalWrite(ULTRASONIC_TRIG, LOW);
  delayMicroseconds(2);
  digitalWrite(ULTRASONIC_TRIG, HIGH);
  delayMicroseconds(10);
  digitalWrite(ULTRASONIC_TRIG, LOW);
  long duration = pulseIn(ULTRASONIC_ECHO, HIGH, 30000);
  if (duration == 0) return -1.0;
  return duration * 0.0343 / 2.0;
}

void publishSensors() {
  StaticJsonDocument<200> doc;
  doc["type"] = "sensor";
  doc["ultrasonic"] = readUltrasonicCm();
  doc["temp"] = dht.readTemperature();
  doc["hum"] = dht.readHumidity();
  serializeJson(doc, Serial);
  Serial.println();
}

// ==================== 메인 ====================
void setup() {
  Serial.begin(115200);
  pinMode(ULTRASONIC_TRIG, OUTPUT);
  pinMode(ULTRASONIC_ECHO, INPUT);
  dht.begin();
  for (uint8_t i = 0; i < MAX_SERVOS; i++) servos[i].used = false;
}

void loop() {
  unsigned long now = millis();
  if (now - lastSensorPublish >= SENSOR_INTERVAL_MS) {
    lastSensorPublish = now;
    publishSensors();
  }
  if (Serial.available()) {
    String line = Serial.readStringUntil('\n');
    line.trim();
    if (line.length() > 0) handleCommand(line);
  }
}

 

3.2 업로드 절차

  1. 아두이노 우노를 PC에 USB로 연결한다.
  2. Tools -> Board에서 Arduino Uno를 선택한다.
  3. Tools -> Port에서 아두이노가 잡힌 포트를 선택한다.
  4. 우측 화살표 버튼(Upload)을 눌러 업로드한다.
  5. 업로드 완료 후 Tools -> Serial Monitor를 열고 보드레이트를 115200으로 맞춘다.
  6. 1초 단위로 다음과 같은 JSON이 출력되면 정상이다.
{"type":"sensor","ultrasonic":-1,"temp":nan,"hum":nan}

 

센서가 미연결 상태이므로 값이 -1 또는 nan으로 나오는 것은 정상이다.

 

4. ROS2 패키지 구성

4.1 디렉터리 구조

라즈베리파이의 ROS2 워크스페이스 (~/turtlebot3_ws/src 등) 하위에 다음 구조로 패키지를 생성한다.

더보기

arduino_bridge/
├── package.xml
├── setup.py
├── setup.cfg
├── resource/
│   └── arduino_bridge
├── config/
│   └── pin_map.yaml
├── launch/
│   └── arduino_bridge.launch.py
└── arduino_bridge/
    ├── __init__.py
    ├── arduino_bridge_node.py
    └── device_manager_node.py

4.2 패키지 생성

워크스페이스로 이동하여 패키지 골격을 생성한다.

cd ~/turtlebot3_ws/src
ros2 pkg create --build-type ament_python arduino_bridge
cd arduino_bridge
mkdir -p config launch

 

이후 아래 4.3~4.8의 파일들을 각각 해당 경로에 작성한다.

 

4.3 package.xml

<?xml version="1.0"?>
<package format="3">
  <name>arduino_bridge</name>
  <version>0.1.0</version>
  <description>Generic Arduino bridge for ROS2</description>
  <maintainer email="steeze1213@gmail.com">steeze1213</maintainer>
  <license>MIT</license>

  <exec_depend>rclpy</exec_depend>
  <exec_depend>std_msgs</exec_depend>
  <exec_depend>python3-serial</exec_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

 

4.4 setup.py

from setuptools import setup
from glob import glob

package_name = 'arduino_bridge'

setup(
    name=package_name,
    version='0.1.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages', ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        ('share/' + package_name + '/config', glob('config/*.yaml')),
        ('share/' + package_name + '/launch', glob('launch/*.launch.py')),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='steeze1213',
    maintainer_email='steeze1213@gmail.com',
    description='Generic Arduino bridge for ROS2',
    license='MIT',
    entry_points={
        'console_scripts': [
            'arduino_bridge_node = arduino_bridge.arduino_bridge_node:main',
            'device_manager_node = arduino_bridge.device_manager_node:main',
        ],
    },
)

 

4.5 config/pin_map.yaml

이 파일이 본 시스템의 핵심 설정 파일이다. 모듈 추가/변경 시 이 파일만 수정한다.

device_manager:
  ros__parameters:
    devices:
      - name: status_led
        type: digital_out
        pin: 13
      - name: pwm_led
        type: pwm_out
        pin: 6
      - name: buzzer
        type: tone_out
        pin: 8
      - name: pan_servo
        type: servo
        pin: 11
        init_angle: 90
      - name: button
        type: digital_in
        pin: 2
        pullup: true

 

지원되는 type은 다음과 같다.

type 자동 생성 토픽 메시지 타입 설명
digital_out /devices/{name}/set Bool true/false로 ON/OFF
pwm_out /devices/{name}/set Int32 0~255 값으로 PWM 출력
tone_out /devices/{name}/beep String "주파수,지속시간(ms)" 형식
servo /devices/{name}/angle Int32 0~180 각도
digital_in (현재 출력 토픽 없음, 모드 설정만) - 입력 핀 모드 등록

4.6 arduino_bridge/arduino_bridge_node.py

import json
import threading
import serial
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32


class ArduinoBridge(Node):
    def __init__(self):
        super().__init__('arduino_bridge')

        self.declare_parameter('port', '/dev/ttyACM0')
        self.declare_parameter('baudrate', 115200)
        port = self.get_parameter('port').value
        baud = self.get_parameter('baudrate').value

        self.ser = serial.Serial(port, baud, timeout=1.0)
        self._write_lock = threading.Lock()
        self.get_logger().info(f'Connected to {port} @ {baud}')

        self.pub_raw = self.create_publisher(String, '/arduino/raw_in', 50)
        self.pub_ultrasonic = self.create_publisher(Float32, '/sensors/ultrasonic', 10)
        self.pub_temp = self.create_publisher(Float32, '/sensors/temperature', 10)
        self.pub_hum = self.create_publisher(Float32, '/sensors/humidity', 10)

        self.sub_cmd = self.create_subscription(
            String, '/arduino/raw_out', self.on_cmd, 50
        )

        self._stop = False
        self._thread = threading.Thread(target=self.read_loop, daemon=True)
        self._thread.start()

    def read_loop(self):
        while not self._stop and rclpy.ok():
            try:
                raw = self.ser.readline().decode('utf-8', errors='ignore').strip()
                if not raw:
                    continue
                self.pub_raw.publish(String(data=raw))
                self.dispatch(raw)
            except Exception as e:
                self.get_logger().error(f'Serial read error: {e}')

    def dispatch(self, raw: str):
        try:
            data = json.loads(raw)
        except json.JSONDecodeError:
            self.get_logger().warn(f'Bad JSON: {raw}')
            return

        if data.get('type') == 'sensor':
            if 'ultrasonic' in data:
                self.pub_ultrasonic.publish(Float32(data=float(data['ultrasonic'])))
            if 'temp' in data:
                self.pub_temp.publish(Float32(data=float(data['temp'])))
            if 'hum' in data:
                self.pub_hum.publish(Float32(data=float(data['hum'])))

    def on_cmd(self, msg: String):
        line = msg.data.strip() + '\n'
        with self._write_lock:
            self.ser.write(line.encode('utf-8'))

    def destroy_node(self):
        self._stop = True
        if self.ser.is_open:
            self.ser.close()
        super().destroy_node()


def main():
    rclpy.init()
    node = ArduinoBridge()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

 

4.7 arduino_bridge/device_manager_node.py

import json
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool, Int32


class DeviceManager(Node):
    def __init__(self):
        super().__init__('device_manager')

        self.declare_parameter('devices', [])
        devices = self.get_parameter('devices').value or []

        self.cmd_pub = self.create_publisher(String, '/arduino/raw_out', 50)
        self.create_subscription(String, '/arduino/raw_in', self.on_raw_in, 50)

        self._device_pins = {}
        self._pin_to_device = {}

        for dev in devices:
            self._register(dev)

        self.get_logger().info(f'Registered {len(devices)} devices')

    def send(self, payload: dict):
        msg = String()
        msg.data = json.dumps(payload)
        self.cmd_pub.publish(msg)

    def _register(self, dev: dict):
        name = dev['name']
        dtype = dev['type']
        pin = int(dev['pin'])
        self._device_pins[name] = pin
        self._pin_to_device[pin] = (name, dtype)

        if dtype == 'digital_out':
            self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': 'out'})
            self.create_subscription(
                Bool, f'/devices/{name}/set',
                lambda msg, p=pin: self.send({'cmd': 'digital', 'pin': p, 'value': 1 if msg.data else 0}),
                10
            )

        elif dtype == 'pwm_out':
            self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': 'out'})
            self.create_subscription(
                Int32, f'/devices/{name}/set',
                lambda msg, p=pin: self.send({'cmd': 'pwm', 'pin': p, 'value': int(msg.data)}),
                10
            )

        elif dtype == 'tone_out':
            self.create_subscription(
                String, f'/devices/{name}/beep',
                lambda msg, p=pin: self._handle_beep(p, msg.data),
                10
            )

        elif dtype == 'servo':
            init_angle = int(dev.get('init_angle', 90))
            self.send({'cmd': 'servo', 'pin': pin, 'angle': init_angle})
            self.create_subscription(
                Int32, f'/devices/{name}/angle',
                lambda msg, p=pin: self.send({'cmd': 'servo', 'pin': p, 'angle': int(msg.data)}),
                10
            )

        elif dtype == 'digital_in':
            mode = 'in_pullup' if dev.get('pullup', False) else 'in'
            self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': mode})

        else:
            self.get_logger().warn(f'Unknown device type: {dtype}')

    def _handle_beep(self, pin: int, payload: str):
        try:
            parts = payload.split(',')
            freq = int(parts[0])
            dur = int(parts[1]) if len(parts) > 1 else 300
            self.send({'cmd': 'tone', 'pin': pin, 'freq': freq, 'dur': dur})
        except Exception as e:
            self.get_logger().warn(f'Bad beep payload: {payload} ({e})')

    def on_raw_in(self, msg: String):
        try:
            data = json.loads(msg.data)
        except json.JSONDecodeError:
            return
        if data.get('type') == 'error':
            self.get_logger().warn(f'Arduino error: {data}')


def main():
    rclpy.init()
    node = DeviceManager()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

 

4.8 launch/arduino_bridge.launch.py

import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description():
    pkg_share = get_package_share_directory('arduino_bridge')
    pin_map = os.path.join(pkg_share, 'config', 'pin_map.yaml')

    return LaunchDescription([
        Node(
            package='arduino_bridge',
            executable='arduino_bridge_node',
            name='arduino_bridge',
            parameters=[{'port': '/dev/ttyACM0', 'baudrate': 115200}],
            output='screen',
        ),
        Node(
            package='arduino_bridge',
            executable='device_manager_node',
            name='device_manager',
            parameters=[pin_map],
            output='screen',
        ),
    ])

 

5. 빌드 및 실행

5.1 빌드

cd ~/turtlebot3_ws
colcon build --packages-select arduino_bridge
source install/setup.bash

빌드 에러가 나는 경우 4.6, 4.7, 4.8의 파일 경로와 들여쓰기를 다시 확인한다.

5.2 시리얼 포트 확인

아두이노가 라즈베리파이에 연결된 후, 다음 명령으로 포트명을 확인한다.

ls /dev/ttyACM* /dev/ttyUSB*

대부분의 경우 /dev/ttyACM0으로 잡힌다. 다른 번호가 나오면 launch 파일의 port 파라미터를 해당 값으로 수정한다.

 

5.3 실행

ros2 launch arduino_bridge arduino_bridge.launch.py

정상 실행 시 다음과 같은 로그가 출력된다.

더보기

[arduino_bridge]: Connected to /dev/ttyACM0 @ 115200
[device_manager]: Registered 5 devices

 

6. 동작 확인

다음 명령들을 별도의 터미널에서 실행하여 각 모듈이 정상 동작하는지 확인한다. 새 터미널마다 source ~/turtlebot3_ws/install/setup.bash를 먼저 실행해야 한다.

6.1 LED 제어 (digital_out)

ros2 topic pub --once /devices/status_led/set std_msgs/Bool "data: true"
ros2 topic pub --once /devices/status_led/set std_msgs/Bool "data: false"

 

6.2 LED 밝기 제어 (pwm_out)

ros2 topic pub --once /devices/pwm_led/set std_msgs/Int32 "data: 128"
ros2 topic pub --once /devices/pwm_led/set std_msgs/Int32 "data: 0"

 

6.3 부저 (tone_out)

ros2 topic pub --once /devices/buzzer/beep std_msgs/String "data: '1000,300'"

1000,300은 1000Hz로 300ms 동안 울리라는 의미.

 

6.4 서보 (servo)

ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 0"
ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 90"
ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 180"

 

6.5 센서 데이터 수신

ros2 topic echo /sensors/ultrasonic
ros2 topic echo /sensors/temperature
ros2 topic echo /sensors/humidity

 

6.6 디버깅용 원시 통신 확인

ros2 topic echo /arduino/raw_in

아두이노가 보내는 모든 JSON 메시지를 그대로 볼 수 있다. 통신 문제 진단 시 사용한다.

 

7. 모듈 추가 절차

신규 모듈을 추가할 때의 표준 절차이다. 아두이노 펌웨어는 수정하지 않는다.

7.1 작업 순서

  1. 아두이노에 모듈을 물리적으로 연결한다 (해당 핀에 결선).
  2. ~/turtlebot3_ws/src/arduino_bridge/config/pin_map.yaml을 연다.
  3. devices: 리스트 하단에 새 항목을 추가한다.
  4. 워크스페이스를 다시 빌드한다.
cd ~/turtlebot3_ws
   colcon build --packages-select arduino_bridge
   source install/setup.bash

   5. 실행 중인 launch를 종료(Ctrl+C) 후 재실행한다.

7.2 추가 예시

7번 핀에 새 LED를 추가하는 경우, pin_map.yaml에 다음을 추가한다.

- name: warning_led
        type: digital_out
        pin: 7

 

재실행 후 아래 명령으로 즉시 사용 가능하다.

ros2 topic pub --once /devices/warning_led/set std_msgs/Bool "data: true"

 

8. 트러블슈팅

증상 원인 및 조치
Permission denied: '/dev/ttyACM0' dialout 그룹 미적용. sudo usermod -a -G dialout $USER 후 재로그인
could not open port 다른 프로세스(시리얼 모니터 등)가 포트 점유 중. Arduino IDE의 Serial Monitor를 닫는다
아두이노 응답 없음 USB 케이블 연결 확인, 보드레이트 115200 일치 확인, ros2 topic echo /arduino/raw_in으로 진단
device_manager: Registered 0 devices YAML 들여쓰기 오류. pin_map.yaml의 들여쓰기를 정확히 확인 (스페이스 2칸)
명령은 보내지는데 모듈 동작 안 함 핀 번호 오기재 또는 결선 오류. 멀티미터로 핀 출력 확인
Bad JSON: ... 경고 다수 발생 시리얼 노이즈 또는 부분 수신. USB 케이블 교체 또는 보드레이트 재확인
unknown cmd 에러 펌웨어가 구버전. 본 매뉴얼 3.1의 최신 스케치 재업로드 필요

9. 시스템 데이터 흐름 요약

[사용자/타 노드]
      |
      | ros2 topic pub /devices/{name}/set
      v
[device_manager_node]
      | YAML 매핑 -> JSON 명령 생성
      v
[/arduino/raw_out]
      v
[arduino_bridge_node]
      | 시리얼 write
      v
[USB] -> [Arduino] -> 핀 제어
                |
                | ack / sensor 데이터
                v
[USB] -> [arduino_bridge_node]
              |
              v
       [/arduino/raw_in], [/sensors/...]