
PM10 기준으로 0~30으로 히트맵 표현
프로젝트 구조
group4_ws/
├── src/
│ ├── air_clean_robot/ # 메인 공기청정 제어 패키지
│ │ ├── air_clean_robot/ # Python 소스 코드
│ │ ├── launch/ # Launch 파일들
│ │ └── config/ # 설정(YAML) 파일
│ └── dust_mapping/ # 먼지 매핑 시각화 패키지
│ ├── dust_mapping/ # Python 소스 코드
│ ├── launch/
│ └── config/
├── build/ # 컴파일된 결과물 (자동 생성)
├── install/ # 설치된 패키지 (자동 생성)
└── log/ # 빌드 로그 (자동 생성)
시스템 개요
1. air_clean_robot 패키지
공기질 기반 자동 탐색 및 무드 매니지먼트 시스템
2. dust_mapping 패키지
실시간 미세먼지 히트맵 매핑 및 시각화 시스템
핵심 컴포넌트 상세 분석
air_clean_robot 패키지
1. Serial Air Quality Node (serial_air_quality_node.py)
- 목적: 아두이노 시리얼 포트로부터 PMS7003 센서 데이터 읽기
- 입력: USB Serial (/dev/ttyACM0) - PM1.0, PM2.5, PM10 데이터
- 출력: /air_quality (JSON 형식 String 메시지)
- 주요 매개변수:
- serial_port: "/dev/ttyACM0"
- baudrate: 9600
- zone_id: "A"
- warmup_sec: 10.0 (초기 안정화 시간)
- reconnect_sec: 3.0 (재연결 주기)
- 데이터 형식:
{"zone": "A", "pm1_0": 12, "pm2_5": 35, "pm10": 50, "source": "serial"}
- 지원 포맷: PM1.0:12,PM2.5:35, 12,35,50, PM1.0=12
2. Sim Air Quality Publisher (sim_air_quality_publisher)
- 목적: 실 센서 없이 테스트할 수 있는 시뮬레이션 노드
- 출력: /air_quality (dirty/normal 모드)
- 모드: dirty (PM2.5=45) 또는 normal (PM2.5=12)
3. Air Quality Task Manager (air_quality_task_manager.py)
- 핵심: FSM(Finite State Machine) 기반 자동 제어 시스템
- 상태 전이: IDLE → GO_TO_DIRTY_ZONE → CLEANING → RETURN_HOME → IDLE
동작 원리:
1. /air_quality 토픽 구독
2. 이동평균 필터로 PM2.5 추적 (window_size: 5개 샘플)
3. 임계값 초과 시(pm25_high_threshold: 35μg/m³) 오염 지역으로 이동
4. 도착 후 cleaning_duration_sec: 30초간 정화 수행
5. 정화 완료 후 대기 위치(home)로 복귀
주요 매개변수:
pm25_high_threshold: 35.0 # 오염 판정 기준
pm25_low_threshold: 25.0 # 정화 완료 기준
cleaning_duration_sec: 30.0 # 정화 소요 시간
use_nav2: true # 실제 네비게이션 사용 여부
4. Manual Nav Controller (manual_nav_controller.py)
- 목적: 수동 네비게이션 명령 처리
- 입력: /air_clean_command (String)
- 지원 명령: "node1", "node2", "node3", "home"
- 기능: 사전 정의된 위치로 Nav2 목표 전송
5. A Path Follower (astar_follower.py)*
- 목적: Nav2 대신 커스텀 A* 경로 계획 및 주행 제어
- 특징:
- 실시간 OccupancyGrid 기반 경로 재계획
- Pure Pursuit 알고리즘 기반 주행
- 라이다 기반 동적 장애물 회피
- 정적 맵 장애물 인플레이션
- 주요 토픽:
- /map, /amcl_pose, /goal_pose
- /cmd_vel (출력 제어 명령)
- /astar_path (시각화용 경로)
6. Air Clean GUI (air_clean_gui.py)
- 목적: Tkinter 기반 GUI 제어 인터페이스
- 기능:
- 실시간 카메라 영상 표시
- 지도 및 로봇 위치 시각화
- 수동 네비게이션 버튼
- 공기질 실시간 모니터링
dust_mapping 패키지
7. MQTT PM Listener (mqtt_pm_listener.py)
- 목적: MQTT 브로커로부터 실시간 PM 센서 데이터 수신
- 지원 백엔드: paho-mqtt 또는 mosquitto_sub CLI
- 입력: MQTT 토픽 (sensor/pm/zoneA/#)
- 출력: /dust/pm (JSON)
8. Dust Mapper (dust_mapper.py)
- 목적: 로봇 위치 + PM 데이터를 결합하여 맵 셀 단위 평균 계산
- 기능:
- cell_size_m: 0.10m 격자 맵 생성
- CSV로 데이터 로깅 (dust_logs/dust_samples.csv)
- 평균/최대/카운트 저장
9. Dust Map Viewer (dust_map_viewer.py)
- 목적: Tkinter 기준 실시간 미세먼지 히트맵 시각화
- 색상 스케일:
- 0-5 μg/m³: 파란색 (안전)
- 5-10 μg/m³: 청록색
- 10-15 μg/m³: 초록색
- 15-20 μg/m³: 노란색
- 20-25 μg/m³: 주황색
- 25-30 μg/m³: 빨간색
- 30+ μg/m³: 어두운 빨간색 (위험)
주요 ROS2 토픽 흐름
AIR QUALITY CHAIN
PMS7003 센서 → Arduino → serial_node → /air_quality → task_manager → Nav2
DUST MAPPING CHAIN
MQTT Broker → mqtt_listener → /dust/pm → dust_mapper → /dust/cells →
dust_viewer
설정 파일 (air_clean_params.yaml)
공통 위치 설정 (3개 노드 공유)
node_1: (2.30, -0.01) # 오염 지역 1
node_2: (2.35, -2.70) # 오염 지역 2
node_3: (-0.09, -2.53) # 오염 지역 3
home: (0.04, -0.04) # 대기 위치
A* 제어 파라미터
linear_speed: 0.08 m/s # 선속도
max_angular_speed: 0.45 rad/s # 최대 각속도
goal_tolerance: 0.12 m # 목표 도착 허용 오차
inflation_radius_cells: 5 # 장애물 인플레이션 (0.25m)
use_scan_obstacles: true # 동적 장애물 감지 활성화
실행 방법
1. 실 센서 모드
ros2 launch air_clean_robot air_clean_serial.launch.py
2. 시뮬레이션 모드
ros2 launch air_clean_robot air_clean_sihttp://m.launch.py
3. 먼지 매핑 모드
ros2 launch dust_mapping dust_map.launch.py
의존성 분석
air_clean_robot 패키지 의존성
- ROS2: rclpy, std_msgs, geometry_msgs, nav_msgs, sensor_msgs, nav2_msgs,
action_msgs
- Python: pyserial, tkinter, Pillow
- 하드웨어: PMS7003 센서, Arduino, TurtleBot3
dust_mapping 패키지 의존성
- ROS2: rclpy, std_msgs, geometry_msgs, nav_msgs
- Python: numpy, PIL, paho-mqtt/mosquitto
- 외부: MQTT Broker (192.168.0.55:1883)
핵심 특징
1. 이중 시스템: 실시간 제어(air_clean_robot) + 데이터 시각화(dust_mapping)
병렬 운영
2. 유연성: 시리얼/MQTT 중복 지원, Nav2/Custom A* 전환 가능
3. 안정성: 이동평균 필터링, 재연결 로직, FSM 기반 안전 제어
4. 확장성: 새로운 오염 지역 추가 용이, 파라미터 기반 설정
요약
ROS2 기반 스마트 공기청정 로봇:
- 입력: PMS7003 미세먼지 센서 (시리얼 또는 MQTT)
- 처리: 이동평균 필터링, FSM 기반 자동 결정
- 제어: Nav2 또는 A* 커스텀 네비게이션
- 출력: 실시간 히트맵 시각화, CSV 로깅, GUI 제어 인터페이스
전체 시스템은 독립적으로 실행 가능한 두 패키지로 구성되어 있으며, 공통된
지도 프레임워크를 기반으로 실내 공기질 모니터링 및 자동 정화를 수행
1. 센서 데이터 수집: Serial Air Quality Node
#!/usr/bin/env python3
import json
import re
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import serial
class SerialAirQualityNode(Node):
def __init__(self):
super().__init__('serial_air_quality_node')
# 매개변수 선언
self.declare_parameter('serial_port', '/dev/ttyACM0')
self.declare_parameter('baudrate', 9600)
self.declare_parameter('zone_id', 'A')
self.declare_parameter('publish_topic', '/air_quality')
self.declare_parameter('warmup_sec', 10.0)
# 시리얼 설정
self.serial_port = self.get_parameter('serial_port').value
self.baudrate = int(self.get_parameter('baudrate').value)
self.publisher = self.create_publisher(String, '/air_quality', 10)
# 0.1초마다 콜백 실행
self.timer = self.create_timer(0.1, self.timer_callback)
def timer_callback(self):
try:
raw = self.serial_conn.readline()
line = raw.decode('utf-8', errors='ignore').strip()
# PM1.0:12,PM2.5:35,PM10:50 형식 파싱
values = self.parse_line(line)
if len(values) == 3:
payload = {
'zone': self.zone_id,
'pm1_0': values['pm1_0'],
'pm2_5': values['pm2_5'],
'pm10': values['pm10'],
'source': 'serial'
}
msg = String()
msg.data = json.dumps(payload)
self.publisher.publish(msg)
except serial.SerialException:
self.serial_conn = None
def main(args=None):
rclpy.init(args=args)
node = SerialAirQualityNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2. 자동 제어 FSM: Air Quality Task Manager
#!/usr/bin/env python3
import json
from enum import Enum
from collections import defaultdict, deque
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from nav2_msgs.action import NavigateToPose
from rclpy.action import ActionClient
from action_msgs.msg import GoalStatus
class State(Enum):
IDLE = 'IDLE'
GO_TO_DIRTY_ZONE = 'GO_TO_DIRTY_ZONE'
CLEANING = 'CLEANING'
RETURN_HOME = 'RETURN_HOME'
ERROR = 'ERROR'
class AirQualityTaskManager(Node):
def __init__(self):
super().__init__('air_quality_task_manager')
# 매개변수 선언
self.declare_parameter('pm25_high_threshold', 35.0)
self.declare_parameter('pm25_low_threshold', 25.0)
self.declare_parameter('window_size', 5)
# PM2.5 임계값 설정
self.pm25_high_threshold = 35.0 # 오염 감지 기준
self.pm25_low_threshold = 25.0 # 정화 완료 기준
self.window_size = 5 # 이동평균 창 크기
# 상태 저장소
self.pm25_windows = defaultdict(lambda: deque(maxlen=5))
self.state = State.IDLE
self.goal_in_progress = False
# 구독 및 클라이언트
self.subscription = self.create_subscription(
String, '/air_quality', self.air_quality_callback, 10)
self.nav_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
# 1초마다 FSM 실행
self.timer = self.create_timer(1.0, self.fsm_tick)
def air_quality_callback(self, msg):
# JSON 파싱: {"zone": "A", "pm2_5": 35, ...}
payload = json.loads(msg.data)
pm25 = float(payload['pm2_5'])
# 이동평균 업데이트
self.pm25_windows['A'].append(pm25)
def fsm_tick(self):
if self.state == State.IDLE:
self.handle_idle()
elif self.state == State.CLEANING:
self.handle_cleaning()
def handle_idle(self):
# PM2.5 평균 계산
avg = sum(self.pm25_windows['A']) / len(self.pm25_windows['A'])
# 오염 감지 → 오염 지역으로 이동
if avg >= self.pm25_high_threshold:
self.send_navigation_goal(
x=1.0, y=0.0, yaw=0.0, # 오염 지역 좌표
on_success_state=State.CLEANING
)
self.state = State.GO_TO_DIRTY_ZONE
def handle_cleaning(self):
# 30초간 정화 수행
elapsed = time.monotonic() - self.cleaning_start_time
if elapsed >= 30.0:
# 정화 완료 → home으로 복귀
self.send_navigation_goal(
x=0.0, y=0.0, yaw=0.0, # home 좌표
on_success_state=State.IDLE
)
self.state = State.RETURN_HOME
def main(args=None):
rclpy.init(args=args)
node = AirQualityTaskManager()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
3. 경로 계획 & 주행: A Follower*
#!/usr/bin/env python3
import heapq
import math
import numpy as np
import rclpy
from rclpy.node import Node
from nav_msgs.msg import OccupancyGrid, Path
from geometry_msgs.msg import PoseWithCovarianceStamped, Twist
from std_msgs.msg import Bool
def run_astar(map_data, start, goal, width, height):
"""A* 경로 계획 알고리즘"""
start_node = (0, start) # (f-cost, position)
open_list = [start_node]
best_cost = {start: 0}
# 8방향 이동 (상하좌우 + 대각선)
moves = [(0,1), (1,0), (0,-1), (-1,0), (1,1), (1,-1), (-1,1), (-1,-1)]
while open_list:
f_cost, current = heapq.heappop(open_list)
if current == goal:
# 경로 재구성
path = []
while current in came_from:
path.append(current)
current = came_from[current]
return list(reversed(path))
for dy, dx in moves:
next_pos = (current[0] + dy, current[1] + dx)
if 0 <= next_pos[0] < height and 0 <= next_pos[1] < width:
if map_data[next_pos[0], next_pos[1]] == 0: # 빈 공간
new_cost = best_cost[current] + 1
if new_cost < best_cost.get(next_pos, float('inf')):
best_cost[next_pos] = new_cost
f_cost = new_cost +
math.hypot(next_pos[0]-goal[0], next_pos[1]-goal[1])
heapq.heappush(open_list, (f_cost, next_pos))
came_from[next_pos] = current
class AStarFollower(Node):
def __init__(self):
super().__init__('astar_follower')
# 구독
self.create_subscription(OccupancyGrid, '/map', self.map_callback,
10)
self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose',
self.pose_callback, 10)
self.create_subscription(Bool, '/shoe_detected',
self.shoe_callback, 10)
# 발행
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# 제어 루프 (10Hz)
self.create_timer(0.1, self.control_loop)
self.path = []
self.current_pose = None
def control_loop(self):
if not self.path or not self.current_pose:
return
# Pure Pursuit: 목표점 추종
target = self.path[min(3, len(self.path)-1)] # lookahead
dx = target[0] - self.current_pose[0]
dy = target[1] - self.current_pose[1]
# 각도 계산
angle = math.atan2(dy, dx)
twist = Twist()
twist.linear.x = 0.1 # 전진 속도
twist.angular.z = angle * 1.5 # 비례 제어
self.cmd_pub.publish(twist)
def main(args=None):
rclpy.init(args=args)
node = AStarFollower()
rclpy.spin(node)
if __name__ == '__main__':
main()
4. MQTT 데이터 수신: MQTT PM Listener
#!/usr/bin/env python3
import json
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import paho.mqtt.client as mqtt
class MqttPmListener(Node):
def __init__(self):
super().__init__('mqtt_pm_listener')
# MQTT 브로커 설정
self.declare_parameter('broker_host', '192.168.0.55')
self.declare_parameter('mqtt_topic', 'sensor/pm/zoneA/#')
self.declare_parameter('output_topic', '/dust/pm')
self.broker = '192.168.0.55'
self.topic = 'sensor/pm/zoneA/#'
# ROS 퍼블리셔
self.pub = self.create_publisher(String, '/dust/pm', 10)
# MQTT 클라이언트
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker, 1883)
self.client.loop_start()
def on_connect(self, client, userdata, flags, rc):
self.client.subscribe(self.topic)
self.get_logger().info('MQTT 연결 완료')
def on_message(self, client, userdata, msg):
# MQTT 메시지를 ROS 메시지로 변환
payload = json.loads(msg.payload)
payload['mqtt_topic'] = msg.topic
payload['source'] = 'mqtt'
ros_msg = String()
ros_msg.data = json.dumps(payload)
self.pub.publish(ros_msg)
def main(args=None):
rclpy.init(args=args)
node = MqttPmListener()
rclpy.spin(node)
if __name__ == '__main__':
main()
핵심 코드 요약
| 노드 | 역할 | 핵심 로직 | 비고 |
| serial_air_quality_node | 센서 데이터 수집 | 시리얼 파싱, JSON 변환 | 실제 하드웨어 연결 |
| air_quality_task_manager | 자동 제어 | FSM + 이동평균 필터 | PM2.5 기준으로 자동 이동 |
| astar_follower | 경로 계획/ 주행 | A* + Pure Pursuit | Nav2 대체 커스텀 네비게이션 |
| mqtt_pm_listener | MQTT 데이터 수신 | paho-mqtt 구독 | 무선 센서 데이터 연동 |
이 4개 노드가 전체 시스템의 핵심 데이터 흐름과 의사결정을 담당
시간 여유로 아두이노 + 라즈베리 파이 연결 실험
아래는 기초적 메뉴얼
(나중에 심화 프로젝트 시간 남거나 최종 때 사용 대비)
Arduino-ROS2 범용 브릿지 시스템 구축 매뉴얼
문서 개요
본 문서는 라즈베리파이(ROS2 Humble) 환경에서 아두이노 우노를 USB로 연결하여 다양한 센서 및 액추에이터(LED, 부저, 서보 등)를 범용적으로 제어하기 위한 시스템 구축 절차를 기술한다. 모듈 추가 시 아두이노 펌웨어를 재작성하지 않고 ROS2 설정 파일(YAML) 수정만으로 확장 가능한 구조를 목표로 한다.
1. 시스템 개요
1.1 구성
| 구성 요소 | 역할 |
| 아두이노 우노 | 핀 단위 입출력을 수행하는 범용 실행기. 센서 데이터를 주기 발행 |
| 라즈베리파이 | ROS2 노드를 통해 아두이노에 명령을 전달하고 데이터를 수신 |
| USB 케이블 | 양방향 시리얼 통신 채널. 보드레이트 115200 사용 |
1.2 설계 원칙
아두이노는 "핀 번호와 동작을 알려주면 그대로 수행하는 기계"로 동작한다. 모듈별 의미(LED, 부저 등)는 라즈베리파이 측 YAML 설정 파일에서만 정의한다. 따라서 모듈 추가/변경 시 아두이노 펌웨어는 수정하지 않는다.
1.3 통신 프로토콜
JSON Lines 방식을 사용한다. 한 줄에 하나의 JSON 객체, 끝에 줄바꿈(\n).
라즈베리 -> 아두이노 (명령)
| cmd | 파라미터 | 용도 |
| pin_mode | pin, mode | 핀 모드 설정 (out/in/in_pullup) |
| digital | pin, value | digitalWrite (0 또는 1) |
| pwm | pin, value | analogWrite (0~255) |
| tone | pin, freq, dur | 부저 음 출력 (Hz, ms) |
| no_tone | pin | 부저 끄기 |
| servo | pin, angle | 서보 각도 (0~180) |
| servo_detach | pin | 서보 해제 |
| ping | - | 연결 확인 |
아두이노 -> 라즈베리 (응답/데이터)
| type | 내용 |
| sensor | 주기적 센서 데이터 |
| ack | 명령 수신 확인 |
| error | 잘못된 명령에 대한 오류 메시지 |
| pong | ping에 대한 응답 |
2. 사전 준비
2.1 하드웨어
- 아두이노 우노 1대
- USB-B 케이블 1개
- 라즈베리파이 (TurtleBot3 탑재 기준, Ubuntu + ROS2 Humble 설치 완료 상태)
- 사용할 센서/액추에이터 (예: HC-SR04, DHT11, LED, 부저, SG90 서보 등)
2.2 소프트웨어 (라즈베리파이)
아래 명령을 라즈베리파이 터미널에서 순서대로 실행한다.
sudo apt update
sudo apt install python3-serial
sudo usermod -a -G dialout $USER
usermod 실행 후에는 반드시 재로그인 또는 재부팅한다. 재로그인 없이는 시리얼 포트 접근 권한이 적용되지 않는다.
2.3 소프트웨어 (아두이노 IDE)
Arduino IDE의 라이브러리 매니저(Tools -> Manage Libraries)에서 다음 라이브러리를 설치한다.
- ArduinoJson (Benoit Blanchon)
- DHT sensor library (Adafruit) — DHT11/DHT22 사용 시
- Servo (기본 내장, 별도 설치 불필요)
3. 아두이노 펌웨어 업로드
3.1 스케치 작성
Arduino IDE에서 새 스케치를 생성하고, 아래 코드 전체를 붙여넣는다.
#include <ArduinoJson.h>
#include <Servo.h>
#include <DHT.h>
// ==================== 사용자 설정 영역 ====================
#define DHT_PIN 4
#define DHT_TYPE DHT11
#define ULTRASONIC_TRIG 9
#define ULTRASONIC_ECHO 10
const unsigned long SENSOR_INTERVAL_MS = 100;
// ==================== 내부 상태 ====================
DHT dht(DHT_PIN, DHT_TYPE);
unsigned long lastSensorPublish = 0;
const uint8_t MAX_SERVOS = 4;
struct ServoSlot {
uint8_t pin;
Servo servo;
bool used;
};
ServoSlot servos[MAX_SERVOS];
// ==================== 유틸 ====================
void sendAck(const char* cmd, int pin) {
StaticJsonDocument<128> doc;
doc["type"] = "ack";
doc["cmd"] = cmd;
if (pin >= 0) doc["pin"] = pin;
serializeJson(doc, Serial);
Serial.println();
}
void sendError(const char* msg, int pin = -1) {
StaticJsonDocument<128> doc;
doc["type"] = "error";
doc["msg"] = msg;
if (pin >= 0) doc["pin"] = pin;
serializeJson(doc, Serial);
Serial.println();
}
// ==================== 서보 슬롯 관리 ====================
Servo* getOrAttachServo(uint8_t pin) {
for (uint8_t i = 0; i < MAX_SERVOS; i++) {
if (servos[i].used && servos[i].pin == pin) return &servos[i].servo;
}
for (uint8_t i = 0; i < MAX_SERVOS; i++) {
if (!servos[i].used) {
servos[i].pin = pin;
servos[i].servo.attach(pin);
servos[i].used = true;
return &servos[i].servo;
}
}
return nullptr;
}
void detachServo(uint8_t pin) {
for (uint8_t i = 0; i < MAX_SERVOS; i++) {
if (servos[i].used && servos[i].pin == pin) {
servos[i].servo.detach();
servos[i].used = false;
return;
}
}
}
// ==================== 명령 처리 ====================
void handleCommand(const String& line) {
StaticJsonDocument<200> doc;
if (deserializeJson(doc, line)) {
sendError("bad json");
return;
}
const char* cmd = doc["cmd"];
if (!cmd) {
sendError("no cmd");
return;
}
if (strcmp(cmd, "pin_mode") == 0) {
int pin = doc["pin"] | -1;
const char* mode = doc["mode"] | "";
if (pin < 0) { sendError("invalid pin", pin); return; }
if (strcmp(mode, "out") == 0) pinMode(pin, OUTPUT);
else if (strcmp(mode, "in") == 0) pinMode(pin, INPUT);
else if (strcmp(mode, "in_pullup") == 0) pinMode(pin, INPUT_PULLUP);
else { sendError("invalid mode", pin); return; }
sendAck(cmd, pin);
}
else if (strcmp(cmd, "digital") == 0) {
int pin = doc["pin"] | -1;
int value = doc["value"] | 0;
if (pin < 0) { sendError("invalid pin", pin); return; }
digitalWrite(pin, value ? HIGH : LOW);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "pwm") == 0) {
int pin = doc["pin"] | -1;
int value = doc["value"] | 0;
if (pin < 0) { sendError("invalid pin", pin); return; }
value = constrain(value, 0, 255);
analogWrite(pin, value);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "tone") == 0) {
int pin = doc["pin"] | -1;
int freq = doc["freq"] | 0;
int dur = doc["dur"] | 0;
if (pin < 0 || freq <= 0) { sendError("invalid tone", pin); return; }
if (dur > 0) tone(pin, freq, dur);
else tone(pin, freq);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "no_tone") == 0) {
int pin = doc["pin"] | -1;
if (pin < 0) { sendError("invalid pin", pin); return; }
noTone(pin);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "servo") == 0) {
int pin = doc["pin"] | -1;
int angle = doc["angle"] | -1;
if (pin < 0 || angle < 0 || angle > 180) {
sendError("invalid servo", pin);
return;
}
Servo* s = getOrAttachServo(pin);
if (!s) { sendError("no servo slot", pin); return; }
s->write(angle);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "servo_detach") == 0) {
int pin = doc["pin"] | -1;
if (pin < 0) { sendError("invalid pin", pin); return; }
detachServo(pin);
sendAck(cmd, pin);
}
else if (strcmp(cmd, "ping") == 0) {
StaticJsonDocument<64> r;
r["type"] = "pong";
serializeJson(r, Serial);
Serial.println();
}
else {
sendError("unknown cmd");
}
}
// ==================== 센서 읽기 ====================
float readUltrasonicCm() {
digitalWrite(ULTRASONIC_TRIG, LOW);
delayMicroseconds(2);
digitalWrite(ULTRASONIC_TRIG, HIGH);
delayMicroseconds(10);
digitalWrite(ULTRASONIC_TRIG, LOW);
long duration = pulseIn(ULTRASONIC_ECHO, HIGH, 30000);
if (duration == 0) return -1.0;
return duration * 0.0343 / 2.0;
}
void publishSensors() {
StaticJsonDocument<200> doc;
doc["type"] = "sensor";
doc["ultrasonic"] = readUltrasonicCm();
doc["temp"] = dht.readTemperature();
doc["hum"] = dht.readHumidity();
serializeJson(doc, Serial);
Serial.println();
}
// ==================== 메인 ====================
void setup() {
Serial.begin(115200);
pinMode(ULTRASONIC_TRIG, OUTPUT);
pinMode(ULTRASONIC_ECHO, INPUT);
dht.begin();
for (uint8_t i = 0; i < MAX_SERVOS; i++) servos[i].used = false;
}
void loop() {
unsigned long now = millis();
if (now - lastSensorPublish >= SENSOR_INTERVAL_MS) {
lastSensorPublish = now;
publishSensors();
}
if (Serial.available()) {
String line = Serial.readStringUntil('\n');
line.trim();
if (line.length() > 0) handleCommand(line);
}
}
3.2 업로드 절차
- 아두이노 우노를 PC에 USB로 연결한다.
- Tools -> Board에서 Arduino Uno를 선택한다.
- Tools -> Port에서 아두이노가 잡힌 포트를 선택한다.
- 우측 화살표 버튼(Upload)을 눌러 업로드한다.
- 업로드 완료 후 Tools -> Serial Monitor를 열고 보드레이트를 115200으로 맞춘다.
- 1초 단위로 다음과 같은 JSON이 출력되면 정상이다.
{"type":"sensor","ultrasonic":-1,"temp":nan,"hum":nan}
센서가 미연결 상태이므로 값이 -1 또는 nan으로 나오는 것은 정상이다.
4. ROS2 패키지 구성
4.1 디렉터리 구조
라즈베리파이의 ROS2 워크스페이스 (~/turtlebot3_ws/src 등) 하위에 다음 구조로 패키지를 생성한다.
arduino_bridge/
├── package.xml
├── setup.py
├── setup.cfg
├── resource/
│ └── arduino_bridge
├── config/
│ └── pin_map.yaml
├── launch/
│ └── arduino_bridge.launch.py
└── arduino_bridge/
├── __init__.py
├── arduino_bridge_node.py
└── device_manager_node.py
4.2 패키지 생성
워크스페이스로 이동하여 패키지 골격을 생성한다.
cd ~/turtlebot3_ws/src
ros2 pkg create --build-type ament_python arduino_bridge
cd arduino_bridge
mkdir -p config launch
이후 아래 4.3~4.8의 파일들을 각각 해당 경로에 작성한다.
4.3 package.xml
<?xml version="1.0"?>
<package format="3">
<name>arduino_bridge</name>
<version>0.1.0</version>
<description>Generic Arduino bridge for ROS2</description>
<maintainer email="steeze1213@gmail.com">steeze1213</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>python3-serial</exec_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
4.4 setup.py
from setuptools import setup
from glob import glob
package_name = 'arduino_bridge'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/config', glob('config/*.yaml')),
('share/' + package_name + '/launch', glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='steeze1213',
maintainer_email='steeze1213@gmail.com',
description='Generic Arduino bridge for ROS2',
license='MIT',
entry_points={
'console_scripts': [
'arduino_bridge_node = arduino_bridge.arduino_bridge_node:main',
'device_manager_node = arduino_bridge.device_manager_node:main',
],
},
)
4.5 config/pin_map.yaml
이 파일이 본 시스템의 핵심 설정 파일이다. 모듈 추가/변경 시 이 파일만 수정한다.
device_manager:
ros__parameters:
devices:
- name: status_led
type: digital_out
pin: 13
- name: pwm_led
type: pwm_out
pin: 6
- name: buzzer
type: tone_out
pin: 8
- name: pan_servo
type: servo
pin: 11
init_angle: 90
- name: button
type: digital_in
pin: 2
pullup: true
지원되는 type은 다음과 같다.
| type | 자동 생성 토픽 | 메시지 타입 | 설명 |
| digital_out | /devices/{name}/set | Bool | true/false로 ON/OFF |
| pwm_out | /devices/{name}/set | Int32 | 0~255 값으로 PWM 출력 |
| tone_out | /devices/{name}/beep | String | "주파수,지속시간(ms)" 형식 |
| servo | /devices/{name}/angle | Int32 | 0~180 각도 |
| digital_in | (현재 출력 토픽 없음, 모드 설정만) | - | 입력 핀 모드 등록 |
4.6 arduino_bridge/arduino_bridge_node.py
import json
import threading
import serial
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32
class ArduinoBridge(Node):
def __init__(self):
super().__init__('arduino_bridge')
self.declare_parameter('port', '/dev/ttyACM0')
self.declare_parameter('baudrate', 115200)
port = self.get_parameter('port').value
baud = self.get_parameter('baudrate').value
self.ser = serial.Serial(port, baud, timeout=1.0)
self._write_lock = threading.Lock()
self.get_logger().info(f'Connected to {port} @ {baud}')
self.pub_raw = self.create_publisher(String, '/arduino/raw_in', 50)
self.pub_ultrasonic = self.create_publisher(Float32, '/sensors/ultrasonic', 10)
self.pub_temp = self.create_publisher(Float32, '/sensors/temperature', 10)
self.pub_hum = self.create_publisher(Float32, '/sensors/humidity', 10)
self.sub_cmd = self.create_subscription(
String, '/arduino/raw_out', self.on_cmd, 50
)
self._stop = False
self._thread = threading.Thread(target=self.read_loop, daemon=True)
self._thread.start()
def read_loop(self):
while not self._stop and rclpy.ok():
try:
raw = self.ser.readline().decode('utf-8', errors='ignore').strip()
if not raw:
continue
self.pub_raw.publish(String(data=raw))
self.dispatch(raw)
except Exception as e:
self.get_logger().error(f'Serial read error: {e}')
def dispatch(self, raw: str):
try:
data = json.loads(raw)
except json.JSONDecodeError:
self.get_logger().warn(f'Bad JSON: {raw}')
return
if data.get('type') == 'sensor':
if 'ultrasonic' in data:
self.pub_ultrasonic.publish(Float32(data=float(data['ultrasonic'])))
if 'temp' in data:
self.pub_temp.publish(Float32(data=float(data['temp'])))
if 'hum' in data:
self.pub_hum.publish(Float32(data=float(data['hum'])))
def on_cmd(self, msg: String):
line = msg.data.strip() + '\n'
with self._write_lock:
self.ser.write(line.encode('utf-8'))
def destroy_node(self):
self._stop = True
if self.ser.is_open:
self.ser.close()
super().destroy_node()
def main():
rclpy.init()
node = ArduinoBridge()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
4.7 arduino_bridge/device_manager_node.py
import json
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool, Int32
class DeviceManager(Node):
def __init__(self):
super().__init__('device_manager')
self.declare_parameter('devices', [])
devices = self.get_parameter('devices').value or []
self.cmd_pub = self.create_publisher(String, '/arduino/raw_out', 50)
self.create_subscription(String, '/arduino/raw_in', self.on_raw_in, 50)
self._device_pins = {}
self._pin_to_device = {}
for dev in devices:
self._register(dev)
self.get_logger().info(f'Registered {len(devices)} devices')
def send(self, payload: dict):
msg = String()
msg.data = json.dumps(payload)
self.cmd_pub.publish(msg)
def _register(self, dev: dict):
name = dev['name']
dtype = dev['type']
pin = int(dev['pin'])
self._device_pins[name] = pin
self._pin_to_device[pin] = (name, dtype)
if dtype == 'digital_out':
self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': 'out'})
self.create_subscription(
Bool, f'/devices/{name}/set',
lambda msg, p=pin: self.send({'cmd': 'digital', 'pin': p, 'value': 1 if msg.data else 0}),
10
)
elif dtype == 'pwm_out':
self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': 'out'})
self.create_subscription(
Int32, f'/devices/{name}/set',
lambda msg, p=pin: self.send({'cmd': 'pwm', 'pin': p, 'value': int(msg.data)}),
10
)
elif dtype == 'tone_out':
self.create_subscription(
String, f'/devices/{name}/beep',
lambda msg, p=pin: self._handle_beep(p, msg.data),
10
)
elif dtype == 'servo':
init_angle = int(dev.get('init_angle', 90))
self.send({'cmd': 'servo', 'pin': pin, 'angle': init_angle})
self.create_subscription(
Int32, f'/devices/{name}/angle',
lambda msg, p=pin: self.send({'cmd': 'servo', 'pin': p, 'angle': int(msg.data)}),
10
)
elif dtype == 'digital_in':
mode = 'in_pullup' if dev.get('pullup', False) else 'in'
self.send({'cmd': 'pin_mode', 'pin': pin, 'mode': mode})
else:
self.get_logger().warn(f'Unknown device type: {dtype}')
def _handle_beep(self, pin: int, payload: str):
try:
parts = payload.split(',')
freq = int(parts[0])
dur = int(parts[1]) if len(parts) > 1 else 300
self.send({'cmd': 'tone', 'pin': pin, 'freq': freq, 'dur': dur})
except Exception as e:
self.get_logger().warn(f'Bad beep payload: {payload} ({e})')
def on_raw_in(self, msg: String):
try:
data = json.loads(msg.data)
except json.JSONDecodeError:
return
if data.get('type') == 'error':
self.get_logger().warn(f'Arduino error: {data}')
def main():
rclpy.init()
node = DeviceManager()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
4.8 launch/arduino_bridge.launch.py
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory('arduino_bridge')
pin_map = os.path.join(pkg_share, 'config', 'pin_map.yaml')
return LaunchDescription([
Node(
package='arduino_bridge',
executable='arduino_bridge_node',
name='arduino_bridge',
parameters=[{'port': '/dev/ttyACM0', 'baudrate': 115200}],
output='screen',
),
Node(
package='arduino_bridge',
executable='device_manager_node',
name='device_manager',
parameters=[pin_map],
output='screen',
),
])
5. 빌드 및 실행
5.1 빌드
cd ~/turtlebot3_ws
colcon build --packages-select arduino_bridge
source install/setup.bash
빌드 에러가 나는 경우 4.6, 4.7, 4.8의 파일 경로와 들여쓰기를 다시 확인한다.
5.2 시리얼 포트 확인
아두이노가 라즈베리파이에 연결된 후, 다음 명령으로 포트명을 확인한다.
ls /dev/ttyACM* /dev/ttyUSB*
대부분의 경우 /dev/ttyACM0으로 잡힌다. 다른 번호가 나오면 launch 파일의 port 파라미터를 해당 값으로 수정한다.
5.3 실행
ros2 launch arduino_bridge arduino_bridge.launch.py
정상 실행 시 다음과 같은 로그가 출력된다.
[arduino_bridge]: Connected to /dev/ttyACM0 @ 115200
[device_manager]: Registered 5 devices
6. 동작 확인
다음 명령들을 별도의 터미널에서 실행하여 각 모듈이 정상 동작하는지 확인한다. 새 터미널마다 source ~/turtlebot3_ws/install/setup.bash를 먼저 실행해야 한다.
6.1 LED 제어 (digital_out)
ros2 topic pub --once /devices/status_led/set std_msgs/Bool "data: true"
ros2 topic pub --once /devices/status_led/set std_msgs/Bool "data: false"
6.2 LED 밝기 제어 (pwm_out)
ros2 topic pub --once /devices/pwm_led/set std_msgs/Int32 "data: 128"
ros2 topic pub --once /devices/pwm_led/set std_msgs/Int32 "data: 0"
6.3 부저 (tone_out)
ros2 topic pub --once /devices/buzzer/beep std_msgs/String "data: '1000,300'"
1000,300은 1000Hz로 300ms 동안 울리라는 의미.
6.4 서보 (servo)
ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 0"
ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 90"
ros2 topic pub --once /devices/pan_servo/angle std_msgs/Int32 "data: 180"
6.5 센서 데이터 수신
ros2 topic echo /sensors/ultrasonic
ros2 topic echo /sensors/temperature
ros2 topic echo /sensors/humidity
6.6 디버깅용 원시 통신 확인
ros2 topic echo /arduino/raw_in
아두이노가 보내는 모든 JSON 메시지를 그대로 볼 수 있다. 통신 문제 진단 시 사용한다.
7. 모듈 추가 절차
신규 모듈을 추가할 때의 표준 절차이다. 아두이노 펌웨어는 수정하지 않는다.
7.1 작업 순서
- 아두이노에 모듈을 물리적으로 연결한다 (해당 핀에 결선).
- ~/turtlebot3_ws/src/arduino_bridge/config/pin_map.yaml을 연다.
- devices: 리스트 하단에 새 항목을 추가한다.
- 워크스페이스를 다시 빌드한다.
cd ~/turtlebot3_ws
colcon build --packages-select arduino_bridge
source install/setup.bash
5. 실행 중인 launch를 종료(Ctrl+C) 후 재실행한다.
7.2 추가 예시
7번 핀에 새 LED를 추가하는 경우, pin_map.yaml에 다음을 추가한다.
- name: warning_led
type: digital_out
pin: 7
재실행 후 아래 명령으로 즉시 사용 가능하다.
ros2 topic pub --once /devices/warning_led/set std_msgs/Bool "data: true"
8. 트러블슈팅
| 증상 | 원인 및 조치 |
| Permission denied: '/dev/ttyACM0' | dialout 그룹 미적용. sudo usermod -a -G dialout $USER 후 재로그인 |
| could not open port | 다른 프로세스(시리얼 모니터 등)가 포트 점유 중. Arduino IDE의 Serial Monitor를 닫는다 |
| 아두이노 응답 없음 | USB 케이블 연결 확인, 보드레이트 115200 일치 확인, ros2 topic echo /arduino/raw_in으로 진단 |
| device_manager: Registered 0 devices | YAML 들여쓰기 오류. pin_map.yaml의 들여쓰기를 정확히 확인 (스페이스 2칸) |
| 명령은 보내지는데 모듈 동작 안 함 | 핀 번호 오기재 또는 결선 오류. 멀티미터로 핀 출력 확인 |
| Bad JSON: ... 경고 다수 발생 | 시리얼 노이즈 또는 부분 수신. USB 케이블 교체 또는 보드레이트 재확인 |
| unknown cmd 에러 | 펌웨어가 구버전. 본 매뉴얼 3.1의 최신 스케치 재업로드 필요 |
9. 시스템 데이터 흐름 요약
[사용자/타 노드]
|
| ros2 topic pub /devices/{name}/set
v
[device_manager_node]
| YAML 매핑 -> JSON 명령 생성
v
[/arduino/raw_out]
v
[arduino_bridge_node]
| 시리얼 write
v
[USB] -> [Arduino] -> 핀 제어
|
| ack / sensor 데이터
v
[USB] -> [arduino_bridge_node]
|
v
[/arduino/raw_in], [/sensors/...]
'로보테크AI' 카테고리의 다른 글
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/05/06 (0) | 2026.05.06 |
|---|---|
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/05/04 (0) | 2026.05.04 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/28 (0) | 2026.04.28 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/24 (0) | 2026.04.24 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/23 (0) | 2026.04.23 |