전면주차 후면주차 추가 + 개선
전체 움직임, 함수부터 정리
"""
TurtleBot3 (와플파이) 벽 따라 주행 + 1바퀴 후 출발지 복귀
요구사항 반영 요약
- 라이다 360개(0도=정면, 반시계 증가)
- 앞/좌/우/뒤 각각 20개 센서를 묶어(median) 판단
- 시작 시 좌/우 중 가까운 벽을 자동 선택하고 끝까지 유지
- 정지-회전(TURN) 방식 금지: 항상 "전진 + 곡률(각속도)"로 코너를 돈다
- 벽과 11자(평행) 유지: 옆벽 앞/뒤 섹터 차이(align_err)를 제어에 포함
- 비상 상황(너무 근접, 정면 매우 근접)에서만 정지/후진
- 1바퀴 완료(헤딩 누적 변화량) 이후에만 시작점(±10cm)에서 정지
- 디버깅 로그(jsonl) 저장
통신
- 센서: GET /control
- 명령: GET /control?lin=...&ang=...
"""
import json
import math
import statistics
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import requests
# ============================================================
# 1) 설정값 (튜닝은 여기만)
# ============================================================
@dataclass
class Cfg:
base_url: str = "http://192.168.0.244:5000/control"
# --- 라이다 ---
lidar_len: int = 360
# 방향별 메인 센서 20개 사용: center ±10deg (20개)
beams: int = 20
half: int = 10 # [-10..+9] -> 20개
# 11자 평행 유지용: 옆벽의 앞/뒤 샘플 각도(메인 center에서 ±off)
side_fb_off_deg: int = 35
valid_min_cm: float = 1.0
valid_max_cm: float = 900.0
# --- 벽 추종 목표 ---
target_cm: float = 20.0
margin_cm: float = 3.0
# --- 속도/각속도 ---
v_nom: float = 0.13 # 직선 주행 기본 속도
v_min: float = 0.06 # 최소 속도(코너에서도 완전 정지 방지)
v_back: float = -0.06 # 비상 후진
w_max: float = 0.60 # 플랫폼 스펙에 맞춰 제한
# --- 코너/전방 ---
front_slow_cm: float = 85.0 # 이 안으로 들어오면 코너 곡률을 강하게
front_stop_cm: float = 12.0 # 이하면 정지(충돌 방지)
# --- 옆벽 안전 ---
side_stop_cm: float = 12.0 # 옆벽이 이하면 비상(충돌/긁힘 방지)
# --- 시작/복귀 ---
leave_tol_m: float = 0.40 # 출발 반경(이 이상 나가야 "나갔다" 인정)
finish_tol_m: float = 0.10 # 복귀 반경(±10cm)
leave_hold: int = 5 # 튐 방지 카운트
finish_hold: int = 8
# --- 1바퀴 판정(헤딩 누적 변화량) ---
lap_turn_rad: float = 2.0 * math.pi * 0.92 # 대략 1바퀴(여유 포함)
lap_min_time_s: float = 6.0 # 너무 빨리 끝나는 오판 방지
# --- 주기/통신 ---
sensor_poll_s: float = 0.03
control_period_s: float = 0.08
http_timeout_sensor: float = 0.15
http_timeout_cmd: float = 0.05
# --- 제어 이득(핵심) ---
# 1) 옆벽 거리 유지(=벽에서 일정거리)
Kd: float = 0.018
# 2) 11자 평행 유지(=옆벽과 평행)
Ka: float = 0.030
# 3) 코너 곡률(전방이 가까우면 코너 방향으로 자연스럽게 감기)
Kf: float = 0.55
# --- 조향 변화 제한(끊김 방지) ---
w_slew_per_step: float = 0.08
# --- 로그 ---
log_dir: str = "."
# ============================================================
# 2) 유틸
# ============================================================
def norm_angle(a: float) -> float:
"""각도 정규화(-pi ~ +pi)"""
return math.atan2(math.sin(a), math.cos(a))
def angle_delta(a: float, b: float) -> float:
"""a - b 의 최소각(라디안)"""
return norm_angle(a - b)
def dist_m(p: Dict[str, float], s: Tuple[float, float]) -> float:
return math.hypot(p["x"] - s[0], p["y"] - s[1])
# ============================================================
# 3) RobotIO (센서 수신 스레드)
# ============================================================
class RobotIO:
"""
- 센서 수신/제어를 분리하여, 센서가 잠깐 느려도 제어 루프가 덜 끊기도록 함
"""
def __init__(self, cfg: Cfg):
self.cfg = cfg
self._lock = threading.Lock()
self._running = True
self._session = requests.Session()
self._lidar: List[float] = []
self._pose: Dict[str, float] = {"x": 0.0, "y": 0.0, "a": 0.0}
self._start_pose: Optional[Tuple[float, float]] = None
self._start_heading: Optional[float] = None
self._start_buf: List[Tuple[float, float, float]] = []
threading.Thread(target=self._sensor_loop, daemon=True).start()
def _sensor_loop(self):
while self._running:
try:
r = self._session.get(self.cfg.base_url, timeout=self.cfg.http_timeout_sensor)
if r.status_code == 200:
j = r.json()
s = j.get("s", [])
p = j.get("p", {})
if isinstance(s, list) and len(s) >= self.cfg.lidar_len:
with self._lock:
self._lidar = s[: self.cfg.lidar_len]
self._pose["x"] = float(p.get("x", 0.0))
self._pose["y"] = float(p.get("y", 0.0))
self._pose["a"] = float(p.get("a", 0.0))
# 시작 위치 안정화(중앙값)
if self._start_pose is None:
self._start_buf.append((self._pose["x"], self._pose["y"], self._pose["a"]))
if len(self._start_buf) >= 10:
xs, ys, aas = zip(*self._start_buf)
self._start_pose = (statistics.median(xs), statistics.median(ys))
self._start_heading = statistics.median(aas)
self._start_buf.clear()
print(f"[START] 시작 위치 고정: {self._start_pose}", flush=True)
except Exception:
pass
time.sleep(self.cfg.sensor_poll_s)
def snap(self):
with self._lock:
return self._lidar[:], dict(self._pose), self._start_pose, self._start_heading
def cmd(self, v: float, w: float):
try:
self._session.get(
f"{self.cfg.base_url}?lin={v:.3f}&ang={w:.3f}",
timeout=self.cfg.http_timeout_cmd,
)
except Exception:
pass
def stop(self):
self._running = False
self.cmd(0.0, 0.0)
@property
def running(self):
return self._running
# ============================================================
# 4) Perception (방향별 20개 센서 median)
# ============================================================
class Perception:
def __init__(self, cfg: Cfg):
self.cfg = cfg
def _valid(self, v: float) -> bool:
return self.cfg.valid_min_cm < v < self.cfg.valid_max_cm
def _sector(self, center_deg: int) -> List[int]:
# 20개: [-10..+9]
return [(center_deg + i) % 360 for i in range(-self.cfg.half, self.cfg.half)]
def sector_median(self, lidar: List[float], center_deg: int) -> float:
vals = [lidar[i] for i in self._sector(center_deg) if self._valid(lidar[i])]
return statistics.median(vals) if vals else 999.0
def analyze(self, lidar: List[float], follow_side: str) -> Dict[str, float]:
# 메인 4방향(각 20개)
F = self.sector_median(lidar, 0)
L = self.sector_median(lidar, 90)
R = self.sector_median(lidar, 270)
B = self.sector_median(lidar, 180)
# 옆벽 평행 오차(11자 유지)
off = self.cfg.side_fb_off_deg
if follow_side == "right":
side_mid = R
side_front = self.sector_median(lidar, 270 - off)
side_back = self.sector_median(lidar, 270 + off)
else:
side_mid = L
side_front = self.sector_median(lidar, 90 - off)
side_back = self.sector_median(lidar, 90 + off)
align_err = side_front - side_back # 0이면 평행(11자)
# 디버깅을 위해 최소값도 같이
min_lr = min(L, R)
return {
"F": F, "L": L, "R": R, "B": B,
"side_mid": side_mid,
"side_front": side_front,
"side_back": side_back,
"align_err": align_err,
"min_lr": min_lr,
}
# ============================================================
# 5) Logger (jsonl)
# ============================================================
class TrajectoryLogger:
def __init__(self, cfg: Cfg):
ts = time.strftime("%Y%m%d_%H%M%S")
self.path = f"{cfg.log_dir}/wall_return_{ts}.jsonl"
self.f = open(self.path, "w", encoding="utf-8")
print(f"[LOG] {self.path}", flush=True)
def write(self, rec: Dict):
self.f.write(json.dumps(rec, ensure_ascii=False) + "\n")
self.f.flush()
def close(self):
self.f.close()
# ============================================================
# 6) 메인 컨트롤러
# ============================================================
class WallReturnController:
"""
동작 흐름
1) 시작 pose 안정화
2) 좌/우 중 가까운 벽을 자동 선택(한 번만)
3) 벽 따라 1바퀴(헤딩 누적 변화량) 돌기
- 항상 전진하면서 곡률 제어(타원형 궤적)
- 거리 유지 + 11자 평행 유지 + 전방 기반 코너 곡률
4) 1바퀴 완료 후 시작점(±10cm) 진입 시 정지
"""
def __init__(self, cfg: Cfg):
self.cfg = cfg
self.io = RobotIO(cfg)
self.perc = Perception(cfg)
self.log = TrajectoryLogger(cfg)
self.follow_side: Optional[str] = None # "left" or "right"
# 출발/복귀 상태
self.has_left = False
self.leave_cnt = 0
self.finish_cnt = 0
# 1바퀴 판정용
self.turn_acc = 0.0
self.prev_a: Optional[float] = None
self.t_start = time.time()
self.lap_done = False
# 끊김 방지(각속도 slew)
self.prev_w = 0.0
def _pick_side_auto(self, lidar: List[float]) -> str:
"""
'더 돌기 쉬운 방향'을 엄밀히 계산하려면 전방 여유/코너 구조까지 봐야 함.
여기서는 요구사항대로 "초기에 더 가까운 벽"을 따라가도록 단순/안정하게 선택.
"""
L = self.perc.sector_median(lidar, 90)
R = self.perc.sector_median(lidar, 270)
return "left" if L < R else "right"
def _slew_w(self, w: float) -> float:
dw = max(min(w - self.prev_w, self.cfg.w_slew_per_step), -self.cfg.w_slew_per_step)
self.prev_w += dw
return self.prev_w
def _update_lap(self, curr_a: float):
"""헤딩 변화 누적(정지 없이도 1바퀴 판단 가능)"""
if self.prev_a is None:
self.prev_a = curr_a
return
da = abs(angle_delta(curr_a, self.prev_a))
self.turn_acc += da
self.prev_a = curr_a
# 너무 이른 오판 방지(최소 시간)
if (not self.lap_done) and (time.time() - self.t_start) >= self.cfg.lap_min_time_s:
if self.turn_acc >= self.cfg.lap_turn_rad:
self.lap_done = True
print(f"[LAP] 1바퀴 완료 판정 (turn_acc={self.turn_acc:.2f}rad)", flush=True)
def _check_leave(self, dh: float):
"""시작점에서 충분히 벗어났는지"""
if self.has_left:
return
if dh > self.cfg.leave_tol_m:
self.leave_cnt += 1
if self.leave_cnt >= self.cfg.leave_hold:
self.has_left = True
print(f"[LEAVE] 출발 반경 이탈 확정 (dh={dh:.3f}m)", flush=True)
else:
self.leave_cnt = 0
def _check_finish(self, dh: float) -> bool:
"""
1바퀴 완료 이후에만 복귀 정지 허용
"""
if not (self.has_left and self.lap_done):
self.finish_cnt = 0
return False
if dh <= self.cfg.finish_tol_m:
self.finish_cnt += 1
if self.finish_cnt >= self.cfg.finish_hold:
return True
else:
self.finish_cnt = 0
return False
def _calc_control(self, env: Dict[str, float]) -> Tuple[float, float, str]:
"""
연속 주행용 제어
- dist_err: 옆벽 거리 유지
- align_err: 11자(평행) 유지
- front_term: 전방이 가까울수록 코너로 더 감기도록 곡률 가중
"""
F = env["F"]
side_mid = env["side_mid"]
align_err = env["align_err"]
# follow_side 기준 부호
# 오른쪽 벽 추종: w(+) = 우회전(시스템 정의상 +w=우회전)
# -> 오른쪽 벽에 너무 멀면(거리↑) 우회전(+)으로 붙어야 하므로 sign=+1
# 왼쪽 벽 추종: 너무 멀면 좌회전(-)으로 붙어야 하므로 sign=-1
sign = +1.0 if self.follow_side == "right" else -1.0
# 비상 정지 조건(최후 수단)
# - 정면이 매우 가까움
# - 옆벽이 매우 가까움(긁힘/충돌 방지)
if F <= self.cfg.front_stop_cm or side_mid <= self.cfg.side_stop_cm:
return 0.0, 0.0, "비상정지"
# 1) 거리 유지(벽에서 일정거리)
dist_err = (side_mid - self.cfg.target_cm) # +면 멀다, -면 가깝다
w_dist = self.cfg.Kd * dist_err * sign
# 2) 평행 유지(11자)
# align_err = side_front - side_back
# 오른쪽 벽 기준: 앞이 더 멀다(+)면 로봇이 벽에서 벌어지는 방향 -> 조금 우회전(+)로 다시 붙고 평행
w_align = self.cfg.Ka * align_err * sign
# 3) 코너 곡률(전방이 가까울수록 "코너 방향으로" 더 감기)
# 오른쪽 벽 추종이면 코너는 보통 우회전(+)로 돌아야 자연스럽게 외곽(빨간 궤적)
front_term = 0.0
if F < self.cfg.front_slow_cm:
front_term = (self.cfg.front_slow_cm - F) / self.cfg.front_slow_cm # 0~1
w_corner = self.cfg.Kf * front_term * sign
# 총 각속도
w = w_dist + w_align + w_corner
w = max(min(w, self.cfg.w_max), -self.cfg.w_max)
w = self._slew_w(w)
# 속도는 각속도 크기에 따라 자연스럽게 감속(정지 금지)
# - 직선이면 빠르게, 코너면 느리게
v = self.cfg.v_nom * (1.0 - min(0.7, abs(w) / self.cfg.w_max))
v = max(self.cfg.v_min, v)
# 목표거리 주변에서 불필요한 흔들림 줄이기(미세 deadband)
if abs(dist_err) <= self.cfg.margin_cm and abs(align_err) <= 2.0:
mode = "직선"
elif front_term > 0.2:
mode = "코너"
else:
mode = "추종"
return v, w, mode
def run(self):
print("[RUN] auto 벽 추종 + 1바퀴 복귀 시작", flush=True)
try:
# 센서 준비 대기
while True:
lidar, pose, start, _ = self.io.snap()
if lidar and start:
break
time.sleep(0.1)
# 벽 자동 선택(입력 없음)
self.follow_side = self._pick_side_auto(lidar)
print(f"[INIT] follow_side={self.follow_side}", flush=True)
while self.io.running:
t0 = time.time()
lidar, pose, start, _ = self.io.snap()
if not lidar or start is None:
time.sleep(0.05)
continue
# 1) 상태 업데이트
dh = dist_m(pose, start)
self._check_leave(dh)
self._update_lap(pose["a"])
# 2) 센서 요약
env = self.perc.analyze(lidar, self.follow_side)
# 3) 종료 조건(1바퀴 완료 후 시작점 진입)
if self._check_finish(dh):
self.io.cmd(0.0, 0.0)
print("[DONE] 출발지 복귀 완료(±10cm)", flush=True)
break
# 4) 제어 계산(연속 주행)
v, w, mode = self._calc_control(env)
self.io.cmd(v, w)
# 5) 로그 저장(디버깅)
self.log.write({
"t": time.time(),
"pose": pose,
"dh": dh,
"follow_side": self.follow_side,
"lap_done": self.lap_done,
"turn_acc": self.turn_acc,
"env": env,
"cmd": {"v": v, "w": w},
"mode": mode,
})
# 6) 주기 유지
dt = time.time() - t0
time.sleep(max(0.0, self.cfg.control_period_s - dt))
finally:
self.io.stop()
self.log.close()
# ============================================================
# 7) 실행 (입력 없음: auto만)
# ============================================================
if __name__ == "__main__":
WallReturnController(Cfg()).run()
움직임은 부드러워졌지만 돌아와서 벽에 박은 다음 멈추질 않음
처음부텉 다시 시작 + 시뮬레이션 설치로 연습
조건
터틀봇3 와플 파이
p값: 각도랑 위치
s값: 360도(정면 0도로 시작해서 시계 반대방향으로 증가)
ex)
{"p":{"a":1.639,"x":2.377,"y":6.848},"s":[84,84,88,94,94,93,92,92,92,91,90,90,89,89,89,88,88,88,88,87,87,87,87,87,87,87,87,87,87,87,87,87,88,88,88,88,88,89,89,89,90,90,91,91,92,92,93,94,94,95,0,0,96,97,98,98,99,100,100,97,93,90,90,88,86,86,84,82,82,80,79,75,75,74,73,73,72,71,70,70,69,0,0,68,68,68,67,66,66,66,65,65,65,64,64,64,64,63,63,63,63,63,63,63,63,63,63,63,63,63,63,63,63,63,64,64,64,64,65,65,65,66,66,66,66,67,67,68,68,0,0,70,71,71,72,73,73,0,73,75,75,76,77,77,78,80,81,81,83,84,84,86,88,88,89,92,94,94,96,98,98,101,104,107,107,111,114,114,118,0,0,169,121,119,119,117,114,114,113,111,110,110,109,108,108,107,106,106,105,105,104,104,104,103,103,103,103,103,103,102,103,103,102,102,102,103,103,103,103,104,104,104,105,105,105,106,106,107,107,108,109,109,111,107,97,97,91,86,86,82,78,78,74,71,68,68,65,63,63,61,59,57,57,56,54,54,53,51,51,50,49,48,48,47,46,46,45,44,43,43,43,42,42,41,40,40,40,39,39,39,38,38,38,38,37,37,37,37,37,37,36,36,36,36,36,36,36,36,36,36,36,0,36,36,36,36,36,36,36,36,36,36,37,37,37,37,37,37,37,38,38,38,38,38,38,39,39,39,40,40,40,41,41,41,42,42,43,43,44,0,0,45,46,46,47,47,49,49,50,51,51,52,54,55,55,57,58,58,61,63,63,65,0,67,67,70,73,73,76,80]}
동작 제어
http://192.168.0.244:5000/control?lin=0.0&ang=0.0 -> 정지 http://192.168.0.244:5000/control?lin=0.15&ang=0.0 -> 전진 http://192.168.0.244:5000/control?lin=-0.15&ang=0.0 -> 후진 http://192.168.0.244:5000/control?lin=0.0&ang=0.6 -> 우회전 http://192.168.0.244:5000/control?lin=0.0&ang=-0.6 ->좌회전
http://192.168.0.244:5000/control -> 센서 라이다값 받아오기
크기(mm)
281 * 306 * 141
가능한 동작
전진, 후진, 회전 겸 방향 전환(좌우(양쪽 바퀴 방향 반대로 해서))
목표(벽 따라 한 바퀴 돈 다음 출발지로 복귀)
아래 설명은 터틀봇이 우측벽을 더 가깝다고 선택했을 경우의 설명, 좌측 벽의 경우는 우측을 좌측으로 바꿔서 같은 방식으로 동작한다.
1.라이다 센서로 주변 공간을 인식
2.라이다 센서 기준으로 좌측(90), 우측(270) 중 상대적으로 가까운 곳을 선택
3.선택한 곳을 기준으로 벽과 11자로 터틀봇 정렬(우측벽을 선택-> 라이다의 270번 센서와 주변 센서 +-10개를 이용하여 우측벽과 터틀봇이 수평을 이루도록(11자형태) 한다)
4.터틀봇을 전진시키며 우측벽에 가까워지면(30cm 보다 가깝게) 방향을 조절해 계속 11자 형태(우측벽과 30cm+-5cm 간격을 유지하며)를 유지하며 전방에 벽이 감지되기 전까지(거리는 약 30cm) 전진한다.
5.전방에 벽이 감지되었을 때(40cm +-5cm), 벽과 터틀봇의 우측면이 수평이 되게 맞춘다(거리는 약 30cm 떨어진 상태로 수평을 맞춘다).
6.이어서 다시 전진(4번 동작)하고 벽을 마주치면 벽과 터틀봇 우측면이 수평이 되게 회전(5번 동작)한다.
7.4~6번 동작을 실행하던 중 터틀봇 기준으로 정면에 출발지점이 보일 경우 출발 지점을 향해 직선으로 간다.
8.출발 지점으로 가던 중 출발 지점과 가까워지면 속도를 기존의 절반으로 줄이고, 출발 지점으로부터 오차범위 이내(+-10cm)에 들어가면 멈춘다.
9.멈춘 다음 터틀봇을 회전하여 처음에 출발했던 방향과 같은 방향을 바라보게 한다.
코드
코드는 파이참에서 작성된다.
터틀봇 코드를 실행했을 때 감지되는 센서값과 위치값은 jsonl 형태로 계속해서 저장한다.
터틀봇을 실행했을 때 처음 위치를 (0,0)으로 인식해서 돌아올때도 (0,0)으로 돌아오게 한다.
터틀봇의 전진은 while문으로 실행해서 중간에 끊기지 않고(방향 전환은 제외) 계속해서 가게 한다.
모든 코드는 클래스, 함수를 구분하여 최대한 간결, 효율적으로 작성한다.
테스트X 임시 코드
import json
import math
import time
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Tuple
import requests
# -----------------------------
# Utilities
# -----------------------------
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def wrap_to_pi(rad: float) -> float:
"""Wrap angle to [-pi, pi]."""
while rad > math.pi:
rad -= 2.0 * math.pi
while rad < -math.pi:
rad += 2.0 * math.pi
return rad
def median(values: List[float]) -> float:
v = sorted(values)
n = len(v)
if n == 0:
return 0.0
mid = n // 2
if n % 2 == 1:
return v[mid]
return 0.5 * (v[mid - 1] + v[mid])
def now_ts() -> float:
return time.time()
# -----------------------------
# Data models
# -----------------------------
@dataclass
class Pose:
x: float
y: float
a: float # yaw (rad)
@dataclass
class SensorPacket:
pose_world: Pose
scan: List[float] # length 360 (as given), values in cm (?) or mm (?) or already meters.
raw: Dict[str, Any]
class Side(Enum):
LEFT = auto()
RIGHT = auto()
class State(Enum):
INIT = auto()
CHOOSE_WALL = auto()
ALIGN_WALL = auto()
FOLLOW_WALL = auto()
CORNER_ALIGN = auto()
GO_HOME = auto()
FINAL_ALIGN = auto()
STOP = auto()
# -----------------------------
# Robot HTTP client
# -----------------------------
class TurtlebotHTTP:
def __init__(self, base_url: str, timeout: float = 1.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def get_sensors(self) -> SensorPacket:
r = requests.get(f"{self.base_url}/control", timeout=self.timeout)
r.raise_for_status()
data = r.json()
p = data["p"]
s = data["s"]
pose = Pose(float(p["x"]), float(p["y"]), float(p["a"]))
scan = [float(v) for v in s]
return SensorPacket(pose_world=pose, scan=scan, raw=data)
def send_cmd(self, lin: float, ang: float) -> None:
# Keep it simple: fire-and-forget.
requests.get(
f"{self.base_url}/control",
params={"lin": f"{lin:.3f}", "ang": f"{ang:.3f}"},
timeout=self.timeout,
)
# -----------------------------
# Logger (jsonl)
# -----------------------------
class JSONLLogger:
def __init__(self, path: str):
self.path = path
def write(self, obj: Dict[str, Any]) -> None:
with open(self.path, "a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
# -----------------------------
# Wall follower controller
# -----------------------------
@dataclass
class Config:
# speed
lin_fast: float = 0.15
lin_slow: float = 0.075
ang_max: float = 0.60
# distance thresholds (meters)
# NOTE: if your lidar values are in cm, set scan_unit_scale=0.01
desired_side_dist: float = 0.30
side_tol: float = 0.05
front_stop_dist: float = 0.40
front_tol: float = 0.05
# windows
side_center_deg_right: int = 270
side_center_deg_left: int = 90
side_window: int = 10
front_center_deg: int = 0
front_window: int = 5
# alignment parameters
align_diff_gain: float = 1.8 # how strongly to rotate during wall parallel alignment
follow_dist_gain: float = 2.0 # how strongly to steer based on side distance error
# go-home detection
home_bearing_deg_tol: float = 15.0
home_line_of_sight_margin: float = 0.15
# stopping / finishing
home_pos_tol: float = 0.10
final_yaw_tol_deg: float = 8.0
# control loop
dt: float = 0.10
# lidar unit scaling: if scan is in "cm", set to 0.01; if "mm", set to 0.001; if already meters, 1.0
scan_unit_scale: float = 0.01 # <-- 매우 중요. 값이 80~120 이런 식이면 보통 cm로 들어옵니다.
class WallFollowReturn:
def __init__(self, bot: TurtlebotHTTP, logger: JSONLLogger, cfg: Config):
self.bot = bot
self.logger = logger
self.cfg = cfg
self.state = State.INIT
self.side: Optional[Side] = None
self.start_pose_world: Optional[Pose] = None
self.start_yaw_world: Optional[float] = None
self.started_at = now_ts()
self.total_travel_est = 0.0 # crude travel estimate
self.prev_rel_pose: Optional[Pose] = None
# ---- Lidar helpers ----
def _scan_m(self, scan: List[float], idx: int) -> float:
v = scan[idx % 360]
if v <= 0:
return 0.0
return v * self.cfg.scan_unit_scale
def _window_median(self, scan: List[float], center: int, half_win: int) -> float:
vals = []
for d in range(-half_win, half_win + 1):
v = self._scan_m(scan, center + d)
if v > 0:
vals.append(v)
if not vals:
return 0.0
return median(vals)
def front_dist(self, scan: List[float]) -> float:
return self._window_median(scan, self.cfg.front_center_deg, self.cfg.front_window)
def side_dist(self, scan: List[float], side: Side) -> float:
center = self.cfg.side_center_deg_right if side == Side.RIGHT else self.cfg.side_center_deg_left
return self._window_median(scan, center, self.cfg.side_window)
def side_parallel_diff(self, scan: List[float], side: Side) -> float:
"""
벽과 평행(11자) 판단을 위한 근사:
side_center±k 의 거리 차이가 작아지면 평행에 가깝다고 본다.
diff > 0이면 한쪽이 더 멀다 → 그에 맞게 회전.
"""
center = self.cfg.side_center_deg_right if side == Side.RIGHT else self.cfg.side_center_deg_left
d1 = self._window_median(scan, center - self.cfg.side_window, 2)
d2 = self._window_median(scan, center + self.cfg.side_window, 2)
if d1 == 0.0 or d2 == 0.0:
return 0.0
return d1 - d2
# ---- Pose handling ----
def rel_pose(self, pose_world: Pose) -> Pose:
assert self.start_pose_world is not None
sx, sy, sa = self.start_pose_world.x, self.start_pose_world.y, self.start_pose_world.a
# translate
dx = pose_world.x - sx
dy = pose_world.y - sy
# rotate into start frame (so start yaw becomes 0 direction)
c = math.cos(-sa)
s = math.sin(-sa)
rx = c * dx - s * dy
ry = s * dx + c * dy
ra = wrap_to_pi(pose_world.a - sa)
return Pose(rx, ry, ra)
def update_travel_est(self, rel_pose: Pose) -> None:
if self.prev_rel_pose is None:
self.prev_rel_pose = rel_pose
return
dx = rel_pose.x - self.prev_rel_pose.x
dy = rel_pose.y - self.prev_rel_pose.y
self.total_travel_est += math.hypot(dx, dy)
self.prev_rel_pose = rel_pose
# ---- High-level decisions ----
def choose_wall(self, scan: List[float]) -> Side:
left = self.side_dist(scan, Side.LEFT)
right = self.side_dist(scan, Side.RIGHT)
# 0이면 무시. 둘다 0이면 기본 RIGHT.
if left == 0.0 and right == 0.0:
return Side.RIGHT
if left == 0.0:
return Side.RIGHT
if right == 0.0:
return Side.LEFT
return Side.LEFT if left < right else Side.RIGHT
def can_go_home_straight(self, rel_pose: Pose, scan: List[float]) -> bool:
# direction to home in robot frame
dx = -rel_pose.x
dy = -rel_pose.y
d_home = math.hypot(dx, dy)
if d_home < 0.25:
return True
bearing = math.atan2(dy, dx) # in start frame coordinates
# robot heading in start frame is rel_pose.a
bearing_err = wrap_to_pi(bearing - rel_pose.a)
tol = math.radians(self.cfg.home_bearing_deg_tol)
if abs(bearing_err) > tol:
return False
front = self.front_dist(scan)
if front == 0.0:
return False
return front > (d_home + self.cfg.home_line_of_sight_margin)
# ---- Control primitives ----
def stop(self) -> None:
self.bot.send_cmd(0.0, 0.0)
def rotate_in_place(self, ang: float) -> None:
self.bot.send_cmd(0.0, clamp(ang, -self.cfg.ang_max, self.cfg.ang_max))
def drive(self, lin: float, ang: float) -> None:
self.bot.send_cmd(clamp(lin, -self.cfg.lin_fast, self.cfg.lin_fast),
clamp(ang, -self.cfg.ang_max, self.cfg.ang_max))
# ---- Behaviors ----
def do_align_wall(self, scan: List[float], side: Side) -> bool:
"""
벽과 평행이 되도록 회전. 충분히 평행하면 True.
"""
diff = self.side_parallel_diff(scan, side)
# diff가 거의 0이면 평행에 가깝다
if abs(diff) < 0.02: # 2cm 정도(스케일에 따라 조정)
self.stop()
return True
# side에 따라 회전 방향이 달라질 수 있으므로 sign을 맞춘다
# 경험적으로: RIGHT 벽 기준 diff(앞쪽-뒤쪽) 형태에 따라 보정이 필요할 수 있음.
# 여기서는 기본적으로 diff가 +면 한쪽이 더 멀다 → 그쪽을 벽에 가깝게 하도록 회전
ang = -self.cfg.align_diff_gain * diff
self.rotate_in_place(ang)
return False
def do_corner_align(self, scan: List[float], side: Side) -> bool:
"""
정면에 벽이 가까울 때, 코너에서 회전해 새 벽을 옆으로 두도록 만든 뒤 평행 정렬.
"""
# 1) 먼저 크게 회전: 우측벽 추종이면 좌회전(코너에서 벽을 따라가려면)
# 환경에 따라 반대일 수 있어, 후속 평행정렬로 수렴하게 함.
turn_dir = -1.0 if side == Side.RIGHT else 1.0
self.rotate_in_place(turn_dir * 0.55)
# 정면이 충분히 열리고(벽이 멀어지고), 옆면 거리가 다시 잡히기 시작하면 종료 조건
front = self.front_dist(scan)
sd = self.side_dist(scan, side)
if front > (self.cfg.front_stop_dist + 0.20) and sd > 0.0:
self.stop()
return True
return False
def do_follow_wall_step(self, scan: List[float], side: Side, lin: float) -> None:
"""
전진하면서 측면 거리 유지. (요구한 "전진 while"은 run()에서 while로 구성)
"""
sd = self.side_dist(scan, side)
if sd == 0.0:
# 측면이 안 잡히면 약하게 원쪽(벽이 있을 것으로 예상되는 방향)으로 틀어 탐색
ang = 0.25 if side == Side.RIGHT else -0.25
self.drive(lin, ang)
return
err = (self.cfg.desired_side_dist - sd) # +면 너무 멀다/가깝다? (스케일 주의)
# err 정의: desired - actual
# actual이 더 크면(sd가 멀면) err 음수 → 벽 쪽으로 붙어야 함.
# RIGHT 벽이면 "우회전(+)" 하면 벽 쪽으로 간다고 가정(좌표계에 따라 반대일 수 있음)
direction = 1.0 if side == Side.RIGHT else -1.0
ang = direction * (-self.cfg.follow_dist_gain * err)
self.drive(lin, ang)
def do_go_home_step(self, rel_pose: Pose, scan: List[float]) -> None:
dx = -rel_pose.x
dy = -rel_pose.y
d = math.hypot(dx, dy)
if d <= self.cfg.home_pos_tol:
self.stop()
self.state = State.FINAL_ALIGN
return
bearing = math.atan2(dy, dx)
bearing_err = wrap_to_pi(bearing - rel_pose.a)
# 가까워질수록 속도 줄이기
lin = self.cfg.lin_slow if d < 0.50 else self.cfg.lin_fast
# 정면 장애물 아주 가까우면 정지(안전)
front = self.front_dist(scan)
if front != 0.0 and front < 0.25:
self.stop()
return
ang = clamp(2.0 * bearing_err, -self.cfg.ang_max, self.cfg.ang_max)
self.drive(lin, ang)
def do_final_align_step(self, rel_pose: Pose) -> None:
# 시작 yaw(=0)을 바라보게: rel_pose.a -> 0
err = wrap_to_pi(0.0 - rel_pose.a)
if abs(err) < math.radians(self.cfg.final_yaw_tol_deg):
self.stop()
self.state = State.STOP
return
self.rotate_in_place(clamp(2.0 * err, -self.cfg.ang_max, self.cfg.ang_max))
# ---- Main loop ----
def run(self) -> None:
"""
요구사항:
- 전진은 while로 끊기지 않게 (방향전환 제외)
- 센서/pose 계속 jsonl 저장
"""
print("Starting controller... (Ctrl+C to stop)")
try:
while self.state != State.STOP:
pkt = self.bot.get_sensors()
if self.start_pose_world is None:
self.start_pose_world = pkt.pose_world
self.start_yaw_world = pkt.pose_world.a
rel = self.rel_pose(pkt.pose_world)
self.update_travel_est(rel)
# log
self.logger.write({
"t": now_ts(),
"state": self.state.name,
"side": None if self.side is None else self.side.name,
"p_world": {"x": pkt.pose_world.x, "y": pkt.pose_world.y, "a": pkt.pose_world.a},
"p_rel": {"x": rel.x, "y": rel.y, "a": rel.a},
"s": pkt.scan,
"travel_est": self.total_travel_est,
})
scan = pkt.scan
if self.state == State.INIT:
self.stop()
self.state = State.CHOOSE_WALL
elif self.state == State.CHOOSE_WALL:
self.side = self.choose_wall(scan)
self.state = State.ALIGN_WALL
elif self.state == State.ALIGN_WALL:
assert self.side is not None
done = self.do_align_wall(scan, self.side)
if done:
self.state = State.FOLLOW_WALL
elif self.state == State.FOLLOW_WALL:
assert self.side is not None
# 7) "정면에 출발지점이 보일 경우" → GO_HOME
# 너무 일찍(시작 직후) 바로 집으로 가는 걸 막기 위해 최소 이동거리 조건 추가
if self.total_travel_est > 1.5 and self.can_go_home_straight(rel, scan):
self.stop()
self.state = State.GO_HOME
else:
# 4) 전진 유지 + 측면거리 유지
front = self.front_dist(scan)
# 전방 벽 감지시(40cm±5cm) → 코너 처리
if front != 0.0 and front < (self.cfg.front_stop_dist + self.cfg.front_tol):
self.stop()
self.state = State.CORNER_ALIGN
else:
# 전진 while 성격: 매 tick마다 전진 명령을 갱신하지만 "전진 상태"는 유지됨
self.do_follow_wall_step(scan, self.side, lin=self.cfg.lin_fast)
elif self.state == State.CORNER_ALIGN:
assert self.side is not None
done = self.do_corner_align(scan, self.side)
if done:
# 5) 코너 후 다시 벽과 평행 정렬
self.state = State.ALIGN_WALL
elif self.state == State.GO_HOME:
self.do_go_home_step(rel, scan)
elif self.state == State.FINAL_ALIGN:
self.do_final_align_step(rel)
time.sleep(self.cfg.dt)
# STOP
self.stop()
print("Finished. (STOP)")
except KeyboardInterrupt:
self.stop()
print("Stopped by user.")
except Exception as e:
self.stop()
raise e
# -----------------------------
# Entry point
# -----------------------------
def main() -> None:
base_url = "http://192.168.0.244:5000"
logger = JSONLLogger("turtlebot_run.jsonl")
bot = TurtlebotHTTP(base_url=base_url, timeout=1.0)
cfg = Config(
# 중요: 라이다 값이 예시처럼 80~120 형태면 cm일 확률이 높습니다.
scan_unit_scale=0.01,
desired_side_dist=0.30,
front_stop_dist=0.40,
)
controller = WallFollowReturn(bot=bot, logger=logger, cfg=cfg)
controller.run()
if __name__ == "__main__":
main()
'로보테크AI' 카테고리의 다른 글
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/02/03 (0) | 2026.02.03 |
|---|---|
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/02/02 (0) | 2026.02.02 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/29 (0) | 2026.01.29 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/28 (1) | 2026.01.28 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/27[TURTLEBOT] (1) | 2026.01.27 |