로보테크AI

융합_로보테크 AI 자율주행 로봇 개발자 과정-26/05/04

steezer 2026. 5. 4. 18:30

전체 시스템 통합 + 웹에 yaml 맵 구현해서 시스템이랑 연결하기

 

ESP01 (펌웨어, 로봇 위)에서 PMS7003 UART 신호를 JSON 으로 변환해 MQTT 토픽 sensor/pm/zoneA 로 발행

데이터는 Mosquitto Broker (192.168.0.55:1883) 로 전달

 

sensor/pm/zoneA : ESP 에서 ROS2 와 웹으로 전달, PM 측정값

robot/1/pose : ROS2 에서 웹으로 전달, amcl_pose 를 JSON 으로 변환한 값

robot/1/battery : ROS2 에서 웹으로 전달, battery_state 를 JSON 으로 변환한 값

robot/1/dust : ROS2 에서 웹으로 전달, PM10 그리드셀 JSON

robot/1/cmd : 웹에서 ROS2 로 전달, 평문 명령

 

브로커의 한쪽에는 Ubuntu (ROS2 Humble) 환경

turtlebot3_bringup (RPi4), Nav2 + AMCL + map_server, dust_mapping (PM10 그리드셀 평균을 만드는 /dust/cells), air_clean_robot 패키지가 동작

 

air_clean_robot

amcl_pose_mqtt_publisher (ROS에서 MQTT로), battery_state_mqtt_publisher (ROS에서 MQTT로), dust_cells_mqtt_publisher (ROS에서 MQTT로), cmd_mqtt_subscriber (MQTT에서 ROS로), manual_nav_controller (명령 FSM) 노드들

 

브로커의 반대쪽에는 Windows (Spring Boot) 환경

MQTT subscribers (PM/pose/battery/dust), REST API (/api/robot/), Thymeleaf + JS 대시보드 (HTML5 Canvas 맵 + 히트맵, 맵 클릭하면 /api/robot/goto 호출), MySQL (PM/RobotStatus 로그) 가 동작

 

MQTT 가 가운데서 4 종류 토픽을 굴리고, 각 시스템은 자기 모국어(C++ / Python / Java) 로 publish/subscribe만 함

 

ROS2 쪽에서 dust_mapper 가 /amcl_pose 와 /dust/pm 을 묶어 셀 단위로 평균 생성

# group4_ws/src/dust_mapping/dust_mapping/dust_mapper.py
def sample(self):
    if self.latest_pm is None or self.latest_pose is None:
        return
    x, y, _ = self.latest_pose
    pm10 = float(self.latest_pm.get('pm10', 0.0))
    cell_x, cell_y = self.world_to_cell(x, y)
    key = (cell_x, cell_y)

    entry = self.cells.setdefault(key, {
        'sum_pm10': 0.0, 'count': 0,
        'x': self.cell_center_x(cell_x),
        'y': self.cell_center_y(cell_y),
    })
    entry['sum_pm10'] += pm10
    entry['count'] += 1

 

MQTT 가 모든 시스템의 글루

ROS2 와 Spring Boot 사이엔 rosbridge_websocket 같은 솔루션이 있지만

PM 센서를 위한 MQTT 브로커가 어차피 떠있어야함

 

 

토픽 컨벤션

토픽 sensor/pm/zoneA 는 ESP 에서 모두에게 전달되고

페이로드는 pm1_0, pm2_5, pm10, rssi 이며 PMS7003 측정값으로 사용

 

토픽 robot/1/pose 는 ROS 에서 웹으로 전달되고

페이로드는 robotId, x, y, yaw 이며 amcl_pose 변환에 사용

 

토픽 robot/1/battery 는 ROS 에서 웹으로 전달되고

페이로드는 voltage, percentage, present 등이며 battery_state 변환에 사용

 

토픽 robot/1/dust 는 ROS 에서 웹으로 전달되고

페이로드는 cell_size_m 과 cells 배열(x, y, pm10, count)이며 PM10 히트맵(retained)에 사용

 

토픽 robot/1/cmd 는 웹에서 ROS 로 전달되고

페이로드는 START, ESTOP, GOTO_XY 1.5,2.0 등이며 평문 명령에 사용

# group4_ws/src/air_clean_robot/air_clean_robot/amcl_pose_mqtt_publisher.py
class AmclPoseMqttPublisher(Node):
    def __init__(self):
        super().__init__('amcl_pose_mqtt_publisher')
        # ... declare_parameter / get_parameter ...

        self.mqtt_client = mqtt.Client()
        self.mqtt_client.connect(self.mqtt_host, self.mqtt_port, 60)
        self.mqtt_client.loop_start()

        self.create_subscription(
            PoseWithCovarianceStamped, self.pose_topic,
            self.pose_callback, pose_qos,
        )
        self.create_timer(self.publish_interval_sec, self.publish_pose)

    def pose_callback(self, msg):
        q = msg.pose.pose.orientation
        yaw = math.atan2(2.0 * (q.w * q.z + q.x * q.y),
                         1.0 - 2.0 * (q.y * q.y + q.z * q.z))
        self.latest_pose = {
            'robotId': 1,
            'x': float(msg.pose.pose.position.x),
            'y': float(msg.pose.pose.position.y),
            'yaw': float(yaw),
        }

    def publish_pose(self):
        if self.latest_pose is None:
            return
        self.mqtt_client.publish(self.mqtt_topic, json.dumps(self.latest_pose))

 

# group4_ws/src/air_clean_robot/air_clean_robot/cmd_mqtt_subscriber.py
class CmdMqttSubscriber(Node):
    def __init__(self):
        super().__init__('cmd_mqtt_subscriber')
        # ...
        self.pub = self.create_publisher(String, '/air_clean_command', 10)
        self.mqtt_client.on_message = self.on_message
        self.mqtt_client.connect(self.mqtt_host, self.mqtt_port, 60)
        self.mqtt_client.loop_start()

    def on_message(self, client, userdata, msg):
        text = msg.payload.decode('utf-8', errors='replace').strip()
        if text:
            self.pub.publish(String(data=text))

 

웹에서 발행 가능한 명령은 8개

START / STOP / IDLE / GO_CHARGE / ESTOP / RELEASE / PATROL / CLEAN / GOTO_XY x,y yaw

이걸 manual_nav_controller 한 노드가 모두 해석

핵심은 ESTOP 게이트 와 순찰 큐

ESTOP 게이트

비상정지 후엔 RELEASE 외엔 어떤 명령도 받지 않음

한 줄 플래그 로직

def command_callback(self, msg):
    raw = msg.data.strip()
    command = raw.lower()

    if self.estop_active and command not in ('release',):
        self.get_logger().warning(f'ESTOP 활성 — "{msg.data}" 무시. 먼저 RELEASE 보내.')
        return

    # ... 명령별 분기 ...
    elif command == 'estop':
        self.patrol_queue.clear()
        self.cancel_current_goal('estop')
        self.cmd_vel_pub.publish(Twist())          # 정지
        self.cleaner_pub.publish(Bool(data=False)) # 청정기 OFF
        self.estop_active = True

 

순찰 큐

def start_patrol(self):
    self.patrol_queue = ['node 1', 'node 2', 'node 3', 'home']
    self.advance_patrol()

def advance_patrol(self):
    if not self.patrol_queue:
        self.get_logger().info('순찰 완료')
        return
    target = self.patrol_queue.pop(0)
    if target == 'home':
        self.send_goal(self.home_x, self.home_y, self.home_yaw, 'home (patrol)')
    else:
        self.send_node_goal(target)

def goal_result_callback(self, future, label):
    self.goal_in_progress = False
    self.current_goal_handle = None
    result = future.result()
    if result.status == GoalStatus.STATUS_SUCCEEDED:
        if self.patrol_queue:
            self.advance_patrol()  # 다음 단계
    else:
        self.patrol_queue.clear()  # 실패 시 전체 중단

 

 

좌표 변환 공식

ROS 의 SLAM 맵 (map_cleaned.pgm) 메타데이터

image: map_cleaned.pgm
resolution: 0.05
origin: [-1.78, -5.08, 0]

 

const MAP_INFO = {
    width: 103, height: 126,
    resolution: 0.05,
    originX: -1.78, originY: -5.08,
    scale: 5
};

function worldToPixel(wx, wy) {
    const px = (wx - MAP_INFO.originX) / MAP_INFO.resolution;
    const py = (wy - MAP_INFO.originY) / MAP_INFO.resolution;
    return {
        x: px * MAP_INFO.scale,
        y: (MAP_INFO.height - py) * MAP_INFO.scale  // ★ y 뒤집기
    };
}

function canvasToWorld(cx, cy) {
    const wx = cx / MAP_INFO.scale * MAP_INFO.resolution + MAP_INFO.originX;
    const wy = (MAP_INFO.height - cy / MAP_INFO.scale)
               * MAP_INFO.resolution + MAP_INFO.originY;
    return { x: wx, y: wy };
}

 

Spring 의 goto 엔드포인트

@PostMapping("/goto")
public Map<String, Object> gotoXy(@RequestBody Map<String, Object> body) {
    double x = ((Number) body.get("x")).doubleValue();
    double y = ((Number) body.get("y")).doubleValue();
    Object yawObj = body.get("yaw");

    if (isStartBlockedByBattery(robotBatteryService.getLatestBattery())) {
        return Map.of("ok", false, "error", "battery low");
    }

    String cmd = (yawObj == null)
            ? String.format(Locale.US, "GOTO_XY:%.4f,%.4f", x, y)
            : String.format(Locale.US, "GOTO_XY:%.4f,%.4f,%.4f",
                            x, y, ((Number) yawObj).doubleValue());

    mqttCommandPublisher.publishCommand(cmd);
    return Map.of("ok", true, "sent", cmd);
}

 

yaw 자동 결정

웹은 좌표만 보내고 yaw 는 안 보냄

도착 방향을 어디로 둘지가 애매하므로, 출발점에서 도착점 벡터의 atan2 로 자동 결정

def compute_auto_yaw(self, target_x, target_y):
    if self.latest_pose is None:
        return 0.0  # AMCL 가 아직 못 박혔으면 fallback
    cx, cy, current_yaw = self.latest_pose
    dx, dy = target_x - cx, target_y - cy
    if abs(dx) < 1e-3 and abs(dy) < 1e-3:
        return current_yaw  # 제자리 클릭이면 현재 방향 유지
    return math.atan2(dy, dx)

 

PM10 히트맵

ROS 의 dust_mapper 가 그리드 셀별 PM10 평균을 만들면, dust_cells_mqtt_publisher 가 MQTT 로 retained 발행

브라우저는 이걸 받아 canvas 오버레이에 색상 사각형으로 그림

색상 보간은 stops 사이 선형

const PM10_STOPS = [
    [0,  [30,136,229]], [5,  [0,188,212]],   [10, [67,160,71]],
    [15, [253,216,53]], [20, [251,140,0]],   [25, [239,83,80]],
    [30, [183,28,28]]
];

function pm10Color(v) {
    v = Math.max(0, Math.min(30, v));
    for (let i = 0; i < PM10_STOPS.length - 1; i++) {
        const [a, ca] = PM10_STOPS[i];
        const [b, cb] = PM10_STOPS[i + 1];
        if (v <= b) {
            const t = (v - a) / Math.max(1e-6, b - a);
            return ca.map((c, k) => Math.round(c + (cb[k] - c) * t));
        }
    }
    return PM10_STOPS[PM10_STOPS.length - 1][1];
}

 

function renderDust() {
    const ctx = canvas.getContext('2d');
    ctx.clearRect(0, 0, canvas.width, canvas.height);

    const cellPx = (latestDustPayload.cell_size_m / MAP_INFO.resolution) * MAP_INFO.scale;
    for (const c of latestDustPayload.cells) {
        const px = worldToPixel(c.x, c.y);
        const [r, g, b] = pm10Color(c.pm10);
        const alpha = Math.min(0.65, 0.18 + Math.min(c.count, 20) * 0.025);
        ctx.fillStyle = `rgba(${r},${g},${b},${alpha})`;
        ctx.fillRect(px.x - cellPx/2, px.y - cellPx/2, cellPx, cellPx);
    }
}

 

배터리 안전 게이트

리튬 배터리 잔량이 20% 미만이면 START / PATROL 명령을 차단

ESTOP / 충전소 복귀는 항상 허용 안전 명령은 막으면 안됨

 

if ("start".equals(normalizedCommand) || "patrol".equals(normalizedCommand)) {
    if (isStartBlockedByBattery(battery)) {
        return blockedStatus("배터리 부족 - 출동 불가");
    }
}
// ESTOP / GO_CHARGE / IDLE / RELEASE / CLEAN 은 게이트 통과
function setMoveButtonsDisabled(disabled) {
    document.getElementById('startButton').disabled = disabled;
    document.getElementById('patrolButton').disabled = disabled;
}

function updateBatteryDisplay(battery) {
    if (!battery.present || battery.percentage < 20) {
        setMoveButtonsDisabled(true);
    } else {
        setMoveButtonsDisabled(false);
    }
}

 

장애물 처리

윈도우 와 리눅스 라인엔딩

find ~/group4_ws/src -name '*.py' -exec dos2unix {} +

 

retained MQTT 메시지 deadlock

mosquitto_pub -h 192.168.0.55 -t robot/1/dust -r -n

 

AMCL pose 의 QoS

pose_qos = QoSProfile(depth=10)
pose_qos.reliability = ReliabilityPolicy.RELIABLE
pose_qos.durability = DurabilityPolicy.TRANSIENT_LOCAL
self.create_subscription(
    PoseWithCovarianceStamped, '/amcl_pose',
    self.pose_callback, pose_qos,
)