어제 하던 smartrobot 이어서
#server->client 데이터 전송 구조
"""
protocol - json으로 묶어서 보낸다.
1.로봇의 현재좌표 x,y
2.로봇이 바라보는 방향 각도
3.목적지 좌표x,y
4.8방향의 센서값 (인덱스0~7) 장애물과의 거리
5.주행상태정보 finished 도착 ok 주행중
"""
import socket
import json
import time
import math
class SmartRobot:
def __init__(self,name,server_ip):
self.name = name
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address=(server_ip,9999)
self.state=None
def connect(self):
try:
self.sock.connect(self.server_address)
msg=json.dumps({"action":"set_name","name":self.name})
self.sock.sendall((msg+"\n").encode())
print("연결완료")
return True
except:
print("연결실패")
return False
def send_drive(self,l_speed,r_speed):
try:
msg=json.dumps({"action":"drive","l_dist":l_speed,"r_dist":r_speed})
self.sock.sendall((msg+"\n").encode())
response=self.sock.recv(2048).decode().strip()
if response:
self.state=json.loads(response.split('\n')[-1])
print(self.state)
return True
except:
return False
return False
def run(self):
if not self.connect():return
self.send_drive(0,0)
while self.state:
if self.state.get('result')=='finished':
print("등록 완료")
break
############################
#알고리즘 작성
#self.state['current_pos_x']:로봇 현재 x위치
#self.state['current_pos_y']:로봇 현재 y위치
#self.state['current_theta']:현재 로봇 방향(각)
#self.state['goal_x'] : 목적지 x
#self.state['goal_y'] : 목적지 y
#self.state['sensor'] : 8방향 거리 센서 리스트 (인덱스 0~7)
sensors=self.state['sensor']
front_dist=sensors[0]
l_speed=4
r_speed=4
if sensors[0]<25:
l_speed=3
r_speed=-3
elif sensors[1]<20 or sensors[2]<20:
l_speed=4
r_speed=2
elif sensors[7]<20 or sensors[6]<20:
l_speed=2
r_speed=4
#1.양옆 센서 값 기반으로 더 넓은 곳 찾기
#2.목적지 좌표 활용 arctan
#math.atan2(y,x) : (0,0)에서 (x,y)지점을 바라볼 때의 각도를 구하는 함수
#탄젠트는 기울기를 구할 때
#아크탄젠트는 각도를 구할 때
#3.라이다 8방향 값 기반 장애물 감지
#4.유클리드 거리
#dx는 differentX의 의미, 시작점 x와 도착점 x의 차이를 의미
#############################
if not self.send_drive(l_speed,r_speed): break
time.sleep(0.05)
self.sock.close()
if __name__ == "__main__":
team_name='abc'
ser_ip="192.168.0.187"
SmartRobot(team_name,ser_ip).run()
# server -> client 데이터 전송 구조
"""
protocol - json으로 묶어서 보낸다.
1.로봇의 현재좌표 x,y
2.로봇이 바라보는 방향 각도 (rad)
3.목적지 좌표x,y
4.8방향의 센서값 (인덱스0~7) 장애물과의 거리
5.주행상태정보 finished 도착 ok 주행중
"""
import socket
import json
import time
import math
def normalize_angle(angle):
"""각도를 -pi ~ pi 범위로 정규화"""
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
class SmartRobot:
def __init__(self, name, server_ip):
self.name = name
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address = (server_ip, 9999)
self.state = None
# ===== 이동거리 계산용 =====
self.prev_x = None
self.prev_y = None
self.total_distance = 0.0
def connect(self):
try:
self.sock.connect(self.server_address)
msg = json.dumps({"action": "set_name", "name": self.name})
self.sock.sendall((msg + "\n").encode())
print("[INFO] 서버 연결 완료")
return True
except Exception as e:
print("[ERROR] 서버 연결 실패:", e)
return False
def send_drive(self, l_speed, r_speed):
try:
msg = json.dumps({
"action": "drive",
"l_dist": l_speed,
"r_dist": r_speed
})
self.sock.sendall((msg + "\n").encode())
response = self.sock.recv(2048).decode().strip()
if response:
self.state = json.loads(response.split('\n')[-1])
return True
except Exception as e:
print("[ERROR] drive 전송 실패:", e)
return False
return False
def run(self):
if not self.connect():
return
# 최초 정지
self.send_drive(0, 0)
step = 0
while self.state:
step += 1
if self.state.get('result') == 'finished':
print("[INFO] 목적지 도착 - 주행 종료")
print(f"[RESULT] 총 이동거리: {self.total_distance:.2f}")
break
# =============================
# 상태 값 추출
# =============================
x = self.state['current_pos_x']
y = self.state['current_pos_y']
theta = self.state['current_theta']
goal_x = self.state['goal_x']
goal_y = self.state['goal_y']
sensors = self.state['sensor']
# =============================
# 이동거리 누적
# =============================
if self.prev_x is None:
self.prev_x = x
self.prev_y = y
else:
step_dist = math.hypot(x - self.prev_x, y - self.prev_y)
self.total_distance += step_dist
self.prev_x = x
self.prev_y = y
# =============================
# 디버깅 출력
# =============================
print(f"\n[STEP {step}]")
print(f"현재 위치: ({x:.2f}, {y:.2f}) θ={math.degrees(theta):.1f}°")
print(f"목적지 위치: ({goal_x:.2f}, {goal_y:.2f})")
print(f"센서값: {sensors}")
print(f"누적 이동거리: {self.total_distance:.2f}")
# =============================
# 1. 목적지 방향
# =============================
goal_theta = math.atan2(goal_y - y, goal_x - x)
angle_error = normalize_angle(goal_theta - theta)
# =============================
# 2. 장애물 회피 벡터
# =============================
repulse_x = 0.0
repulse_y = 0.0
sensor_angles = [
0,
math.pi / 4,
math.pi / 2,
3 * math.pi / 4,
math.pi,
-3 * math.pi / 4,
-math.pi / 2,
-math.pi / 4
]
for i, (dist, ang) in enumerate(zip(sensors, sensor_angles)):
if dist < 35:
strength = (35 - dist) / 35
repulse_x -= strength * math.cos(theta + ang)
repulse_y -= strength * math.sin(theta + ang)
print(f" [회피] 센서{i} dist={dist} strength={strength:.2f}")
if repulse_x != 0 or repulse_y != 0:
avoid_theta = math.atan2(repulse_y, repulse_x)
avoid_error = normalize_angle(avoid_theta - theta)
else:
avoid_error = 0.0
# =============================
# 3. 최종 조향 각도
# =============================
final_error = normalize_angle(
0.72 * angle_error + 0.28 * avoid_error
)
# =============================
# 4. 바퀴 속도 계산 (가변 회전)
# =============================
base_speed = 10
base_turn_gain = 8.0
max_turn_gain = 8.0
avoid_threshold = 10.0
min_dist = min(sensors)
if min_dist < avoid_threshold:
ratio = (avoid_threshold - min_dist) / avoid_threshold
turn_gain = base_turn_gain + ratio * (max_turn_gain - base_turn_gain)
else:
turn_gain = base_turn_gain
l_speed = base_speed - turn_gain * final_error
r_speed = base_speed + turn_gain * final_error
print(f"회전 게인: {turn_gain:.2f} (min_dist={min_dist})")
# =============================
# 5. 근접 장애물 → 강제 회전
# =============================
if sensors[0] < 10:
print("[EMERGENCY] 정면 매우 근접 → 강제 회전")
if sensors[1] + sensors[2] > sensors[6] + sensors[7]:
l_speed = -5
r_speed = 5
else:
l_speed = 5
r_speed = -5
# =============================
# 6. 속도 제한
# =============================
l_speed = max(min(l_speed, 5.0), -5.0)
r_speed = max(min(r_speed, 5.0), -5.0)
print(f"모터 명령: L={l_speed:.2f}, R={r_speed:.2f}")
# =============================
# 명령 전송
# =============================
if not self.send_drive(l_speed, r_speed):
print("[ERROR] 주행 명령 실패")
break
time.sleep(0.05)
self.sock.close()
print("[INFO] 소켓 종료")
print(f"[FINAL RESULT] 최종 누적 이동거리: {self.total_distance:.2f}")
if __name__ == "__main__":
team_name = "1조"
ser_ip = "192.168.0.187"
SmartRobot(team_name, ser_ip).run()
센서로 장애물 감지해서 미리 피함 -> 장애물이 많아지면 멈춤
import socket
import json
import time
import math
class SmartRobot:
def __init__(self, name, server_ip):
# --- [설정 및 통신 초기화] ---
self.name = name
self.server_address = (server_ip, 9999) # 서버 주소와 포트 번호
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.state = None # 서버에서 받은 로봇의 현재 상태 저장용
# --- [주행 밸런스 파라미터] ---
self.MAX_SPEED = 7.0 # 바퀴의 최대 속도 제한
self.BASE_SPEED = 6.0 # 평상시 전진 속도
self.TURN_GAIN = 2.8 # 회전 민감도 (높을수록 더 팍팍 꺾임)
# --- [상태 관리 및 탈출용 변수] ---
self.prev_x, self.prev_y = None, None # 이전 위치 저장 (이동 여부 확인용)
self.stuck_count = 0 # 멈춰있는 시간(카운트) 측정
self.escape_timer = 0 # 강제 탈출 동작 지속 시간
# [통신] 서버에 접속 시도
def connect(self):
try:
self.sock.connect(self.server_address)
# 서버에 로봇 이름 등록 (최초 1회)
msg = json.dumps({"action": "set_name", "name": self.name})
self.sock.sendall((msg + "\n").encode())
print(f"[{self.name}] 연결 성공!")
return True
except Exception as e:
print(f"연결 실패: {e}")
return False
# [통신] 바퀴 속도 명령을 보내고 현재 상태 데이터를 받아옴
def send_drive(self, l_speed, r_speed):
try:
# 주행 명령 작성
msg = json.dumps({"action": "drive", "l_dist": float(l_speed), "r_dist": float(r_speed)})
self.sock.sendall((msg + "\n").encode())
# 서버의 응답(센서, 위치 데이터 등) 수신
response = self.sock.recv(2048).decode().strip()
if response:
# 데이터가 여러 개 올 경우 가장 마지막 데이터를 파싱
self.state = json.loads(response.split('\n')[-1])
return True
except:
return False
# [메인 엔진] 로봇의 전체 동작 사이클 관리
def run(self):
if not self.connect(): return
self.send_drive(0, 0) # 최초 상태를 받아오기 위한 빈 명령
while self.state:
# 도착 여부 확인
if self.state.get("result") == "finished":
print("🎯 목적지 도착 완료!")
break
# 1단계: 주변 상황 파악 (Sensing)
sensors, pos, goal, theta = self.parse_state()
self.check_stuck(pos) # 내가 어딘가 껴있지는 않은가?
# 2단계: 어떻게 움직일지 결정 (Thinking)
l_speed, r_speed = self.decide_movement(sensors, pos, goal, theta)
# 3단계: 속도 제한 및 최종 전송 (Acting)
l_speed = max(-self.MAX_SPEED, min(self.MAX_SPEED, l_speed))
r_speed = max(-self.MAX_SPEED, min(self.MAX_SPEED, r_speed))
if not self.send_drive(l_speed, r_speed): break
time.sleep(0.03) # 0.03초마다 반복 (주기 조절)
self.sock.close()
# [분석] 서버 데이터를 변수별로 정리
def parse_state(self):
s = self.state.get("sensor", [1000] * 8) # 센서 데이터 (없으면 1000)
s = [d if (d is not None and d >= 0) else 1000 for d in s] # 데이터 정제
pos = (self.state["current_pos_x"], self.state["current_pos_y"]) # 내 위치
goal = (self.state["goal_x"], self.state["goal_y"]) # 목표 위치
theta = self.state["current_theta"] # 내가 바라보는 각도
return s, pos, goal, theta
# [분석] 이동 거리를 체크하여 갇혔는지 판단
def check_stuck(self, pos):
if self.prev_x is not None:
# 이전 위치와 현재 위치 사이의 거리 계산
moved = math.hypot(pos[0] - self.prev_x, pos[1] - self.prev_y)
# 0.5 미만으로 움직였다면 멈춰있는 것으로 간주하고 카운트 업
self.stuck_count = self.stuck_count + 1 if moved < 0.5 else 0
self.prev_x, self.prev_y = pos[0], pos[1] # 현재 위치를 다음 루프를 위해 저장
# 8번 연속으로 못 움직였다면 탈출 모드 가동 (15루프 동안 후진)
if self.stuck_count > 8 and self.escape_timer <= 0:
print("🚨 갇힘 감지! 강제 탈출 시퀀스 가동")
self.escape_timer = 15
# [핵심 로직] 센서와 목적지를 고려하여 실제 바퀴 속도 결정
def decide_movement(self, s, pos, goal, theta):
# 1. 탈출 모드: 갇혔을 때는 모든 걸 무시하고 후진 회전
if self.escape_timer > 0:
self.escape_timer -= 1
if self.escape_timer == 0: self.stuck_count = 0 # 타이머 끝나면 카운트 리셋
return -5.0, -2.0 # 뒤로 빠지면서 살짝 틈을 만듦
# 2. 장애물 회피 판단
front_risk = min(s[0], s[1], s[7]) # 정면과 대각선 앞쪽 중 위험한 쪽
left_space = s[1] + s[2] # 왼쪽 여유 공간 합계
right_space = s[7] + s[6] # 오른쪽 여유 공간 합계
# [상황 A] 비상 상황: 벽이 너무 가까울 때 (12 미만)
if front_risk < 12:
# 공간이 넓은 쪽으로 바퀴를 반대로 돌려 제자리 회전
return (-4.5, 6.5) if left_space > right_space else (6.5, -4.5)
# [상황 B] 주의 상황: 벽이 다가올 때 (12~25)
elif front_risk < 25:
# 벽에 가까울수록 전진 속도를 줄여서 충돌 방지
scale = max(0.4, front_risk / 30.0)
# 여유 공간 쪽으로 부드러운 곡선 회전
return (1.0, 6.0 * scale) if left_space > right_space else (6.0 * scale, 1.0)
# [상황 C] 일반 주행: 장애물이 없을 때 목적지 향하기
# 목적지까지의 각도 계산
target_rad = math.atan2(goal[1] - pos[1], goal[0] - pos[0])
# 내 각도와 목표 각도의 차이 계산 (-pi ~ pi 범위로 정규화)
diff = (target_rad - theta + math.pi) % (2 * math.pi) - math.pi
# 각도 차이에 TURN_GAIN을 곱해 좌우 바퀴 속도 조절 (P-제어)
l_speed = self.BASE_SPEED - (self.TURN_GAIN * diff)
r_speed = self.BASE_SPEED + (self.TURN_GAIN * diff)
return l_speed, r_speed
if __name__ == "__main__":
# "1조"라는 이름으로 지정된 IP 서버에 접속하여 실행
SmartRobot("1조", "192.168.0.187").run()
import socket
import json
import time
import math
class SmartRobot:
def __init__(self, name, server_ip):
self.name = name
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address = (server_ip, 9999)
self.state = None
# ===== 로봇청소기용 상태 =====
self.mode = "GOAL" # GOAL / AVOID / BACK
self.prev_x = None
self.prev_y = None
self.stuck_count = 0
def connect(self):
try:
self.sock.connect(self.server_address)
msg = json.dumps({"action": "set_name", "name": self.name})
self.sock.sendall((msg + "\n").encode())
print("연결완료")
return True
except:
print("연결실패")
return False
def send_drive(self, l_speed, r_speed):
try:
msg = json.dumps({
"action": "drive",
"l_dist": l_speed,
"r_dist": r_speed
})
self.sock.sendall((msg + "\n").encode())
response = self.sock.recv(2048).decode().strip()
if response:
self.state = json.loads(response.split('\n')[-1])
print(self.state)
return True
except:
return False
return False
def run(self):
if not self.connect():
return
self.send_drive(0, 0)
while self.state:
if self.state.get("result") == "finished":
print("🎯 목적지 도착")
break
# ===============================
# 1️⃣ 센서 결측치 / 이상치 처리
# ===============================
sensors = self.state.get("sensor")
if sensors is None or len(sensors) != 8:
self.send_drive(0, 0)
time.sleep(0.05)
continue
clean = []
for d in sensors:
if d is None or d < 0:
clean.append(1000)
elif d > 1000:
clean.append(1000)
else:
clean.append(d)
sensors = clean
front = sensors[0]
left = min(sensors[1], sensors[2])
right = min(sensors[6], sensors[7])
# ===============================
# 2️⃣ 위치 변화 감지 (막힘 체크)
# ===============================
x = self.state["current_pos_x"]
y = self.state["current_pos_y"]
if self.prev_x is not None:
moved = math.sqrt((x - self.prev_x) ** 2 + (y - self.prev_y) ** 2)
if moved < 2:
self.stuck_count += 1
else:
self.stuck_count = 0
self.prev_x = x
self.prev_y = y
# ===============================
# 3️⃣ FSM 상태 전환
# ===============================
if front < 20:
self.mode = "AVOID"
if front < 15 and left < 20 and right < 20:
self.mode = "BACK"
if self.stuck_count > 8:
self.mode = "BACK"
# ===============================
# 4️⃣ 행동 결정 (로봇청소기 핵심)
# ===============================
l_speed = 0
r_speed = 0
if self.mode == "GOAL":
base = 7
theta = self.state["current_theta"]
gx = self.state["goal_x"]
gy = self.state["goal_y"]
target = math.atan2(gy - y, gx - x)
diff = target - theta
while diff > math.pi:
diff -= 2 * math.pi
while diff < -math.pi:
diff += 2 * math.pi
turn_gain = 1.2
l_speed = base - turn_gain * diff
r_speed = base + turn_gain * diff
elif self.mode == "AVOID":
if left > right:
l_speed = -3
r_speed = 6
else:
l_speed = 6
r_speed = -3
if front > 35:
self.mode = "GOAL"
elif self.mode == "BACK":
l_speed = -5
r_speed = -5
time.sleep(0.2)
# 회전 후 재탐색
if left > right:
l_speed = -3
r_speed = 6
else:
l_speed = 6
r_speed = -3
self.stuck_count = 0
self.mode = "GOAL"
# ===============================
# 5️⃣ 속도 제한
# ===============================
l_speed = max(-7, min(7, l_speed))
r_speed = max(-7, min(7, r_speed))
# ===============================
if not self.send_drive(l_speed, r_speed):
break
time.sleep(0.05)
self.sock.close()
if __name__ == "__main__":
team_name = "111"
server_ip = "192.168.0.187"
SmartRobot(team_name, server_ip).run()
빠르긴 한데 가끔 대각선으로 장애물을 뚫고 지나감
해결: trun_gain = 2.3
# import math
# print(math.atan2(600,600))
#
# #360도 : 2파이 라디안
# #1파이라디안 180
# #1/2 라디안: 90
# #1/4 라디안: 45
# print(math.pi/4)
#
# rad=0.7853
# deg=math.degrees(math.pi/4)
# print(deg)
#
# server -> client 데이터 전송 구조
"""
protocol - json
1. current_pos_x, current_pos_y
2. current_theta
3. goal_x, goal_y
4. sensor : 8방향 거리 센서 (0~7)
5. result : finished / running
"""
import socket
import json
import time
import math
class SmartRobot:
def __init__(self, name, server_ip):
self.name = name
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address = (server_ip, 9999)
self.state = None
# ===== 로봇청소기용 상태 =====
self.mode = "GOAL" # GOAL / AVOID / BACK
self.prev_x = None
self.prev_y = None
self.stuck_count = 0
def connect(self):
try:
self.sock.connect(self.server_address)
msg = json.dumps({"action": "set_name", "name": self.name})
self.sock.sendall((msg + "\n").encode())
print("연결완료")
return True
except:
print("연결실패")
return False
def send_drive(self, l_speed, r_speed):
try:
msg = json.dumps({
"action": "drive",
"l_dist": l_speed,
"r_dist": r_speed
})
self.sock.sendall((msg + "\n").encode())
response = self.sock.recv(2048).decode().strip()
if response:
self.state = json.loads(response.split('\n')[-1])
print(self.state)
return True
except:
return False
return False
def run(self):
if not self.connect():
return
self.send_drive(0, 0)
while self.state:
if self.state.get("result") == "finished":
print("🎯 목적지 도착")
break
# ===============================
# 1️⃣ 센서 결측치 / 이상치 처리
# ===============================
sensors = self.state.get("sensor")
if sensors is None or len(sensors) != 8:
self.send_drive(0, 0)
time.sleep(0.05)
continue
clean = []
for d in sensors:
if d is None or d < 0:
clean.append(1000)
elif d > 1000:
clean.append(1000)
else:
clean.append(d)
sensors = clean
front = sensors[0]
left = min(sensors[1], sensors[2])
right = min(sensors[6], sensors[7])
# ===============================
# 2️⃣ 위치 변화 감지 (막힘 체크)
# ===============================
x = self.state["current_pos_x"]
y = self.state["current_pos_y"]
if self.prev_x is not None:
moved = math.sqrt((x - self.prev_x) ** 2 + (y - self.prev_y) ** 2)
if moved < 2:
self.stuck_count += 1
else:
self.stuck_count = 0
self.prev_x = x
self.prev_y = y
# ===============================
# 3️⃣ FSM 상태 전환
# ===============================
if front < 20:
self.mode = "AVOID"
if front < 15 and left < 20 and right < 20:
self.mode = "BACK"
if self.stuck_count > 8:
self.mode = "BACK"
# ===============================
# 4️⃣ 행동 결정 (로봇청소기 핵심)
# ===============================
l_speed = 0
r_speed = 0
if self.mode == "GOAL":
base = 7
theta = self.state["current_theta"]
gx = self.state["goal_x"]
gy = self.state["goal_y"]
target = math.atan2(gy - y, gx - x)
diff = target - theta
while diff > math.pi:
diff -= 2 * math.pi
while diff < -math.pi:
diff += 2 * math.pi
turn_gain = 2.3
l_speed = base - turn_gain * diff
r_speed = base + turn_gain * diff
elif self.mode == "AVOID":
if left > right:
l_speed = -3
r_speed = 6
else:
l_speed = 6
r_speed = -3
if front > 35:
self.mode = "GOAL"
elif self.mode == "BACK":
l_speed = -5
r_speed = -5
time.sleep(0.2)
# 회전 후 재탐색
if left > right:
l_speed = -3
r_speed = 6
else:
l_speed = 6
r_speed = -3
self.stuck_count = 0
self.mode = "GOAL"
# ===============================
# 5️⃣ 속도 제한
# ===============================
l_speed = max(-7, min(7, l_speed))
r_speed = max(-7, min(7, r_speed))
# ===============================
if not self.send_drive(l_speed, r_speed):
break
time.sleep(0.05)
self.sock.close()
if __name__ == "__main__":
team_name = "1조2.3"
server_ip = "192.168.0.187"
SmartRobot(team_name, server_ip).run()
import json
import time
from urllib.parse import urlencode
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
BASE = "http://192.168.0.244:5000/control"
# ===== 설정 =====
STOP_THRESHOLD = 70 # 70 정도(이하)면 정지
WINDOW = 2 # 정면 주변 ±2 (총 5개)
STOP_N = 2 # 2번 연속 위험이면 STOP
HZ = 20
PERIOD = 1.0 / HZ
# 속도(요청 URL 기준)
LIN_FWD = 0.15
LIN_STOP = 0.0
ANG_STRAIGHT = 0.0
def http_get_text(url: str, timeout: float = 1.0):
req = Request(url, method="GET")
with urlopen(req, timeout=timeout) as r:
body = r.read().decode("utf-8", errors="replace").strip()
return r.status, body
def get_sensors():
"""센서 라이다값: GET /control -> {"s":[...]}"""
try:
status, body = http_get_text(BASE, timeout=1.0)
if status != 200:
return None
j = json.loads(body) if body else None
if not j or "s" not in j:
return None
s = j["s"]
if not isinstance(s, list) or len(s) == 0:
return None
return [1000 if (v is None or v < 0) else int(v) for v in s]
except (URLError, HTTPError, json.JSONDecodeError):
return None
def send_cmd(lin: float, ang: float) -> bool:
"""이동 제어: GET /control?lin=<float>&ang=<float> (사용자 수정 포맷 반영)"""
qs = urlencode({"lin": float(lin), "ang": float(ang)})
url = f"{BASE}?{qs}"
try:
http_get_text(url, timeout=1.0)
return True
except (URLError, HTTPError):
return False
def wrap_idxs(n, center, window):
return [((center + k) % n) for k in range(-window, window + 1)]
def robust_min(vals):
"""노이즈 1개 튐 완화: 최소값 대신 2번째 최소값 사용"""
a = sorted(vals)
return a[1] if len(a) >= 2 else a[0]
def run_forward_stop(seconds=10.0):
# 시작 정지
send_cmd(LIN_STOP, 0.0)
time.sleep(0.1)
s0 = get_sensors()
if not s0:
print("센서 수신 실패: /control 응답 확인 필요")
return
n = len(s0)
print(f"센서 길이: {n}")
if n == 36:
# ✅ 사용자 확정: 0번 인덱스가 정면
front_idx = 0
watch_idxs = wrap_idxs(36, front_idx, WINDOW) # [34,35,0,1,2]
print(f"FRONT_IDX={front_idx} (정면), 감시 인덱스={watch_idxs}")
elif n == 8:
watch_idxs = [7, 0, 1]
print(f"감시 인덱스(8): {watch_idxs}")
else:
print(f"지원하지 않는 센서 길이: {n}")
return
print(f"STOP_THRESHOLD={STOP_THRESHOLD}, WINDOW=±{WINDOW}, STOP_N={STOP_N}")
print("CTRL+C로 중지")
danger_cnt = 0
t_end = time.time() + seconds
try:
while time.time() < t_end:
s = get_sensors()
if not s or len(s) != n:
send_cmd(LIN_STOP, 0.0)
print("\r[SENSOR FAIL] -> STOP", end="", flush=True)
time.sleep(PERIOD)
continue
vals = [s[i] for i in watch_idxs]
risk = robust_min(vals) # 정면 주변 "골고루" 감시 + 노이즈 완화
if risk <= STOP_THRESHOLD:
danger_cnt += 1
else:
danger_cnt = 0
if danger_cnt >= STOP_N:
send_cmd(LIN_STOP, 0.0)
mode = "STOP"
else:
send_cmd(LIN_FWD, ANG_STRAIGHT)
mode = "GO"
print(
f"\r[{mode}] risk={risk:>4} idxs={watch_idxs} vals={vals}",
end="",
flush=True
)
time.sleep(PERIOD)
except KeyboardInterrupt:
print("\n\nCTRL+C -> STOP")
finally:
# 종료 시 확실히 정지
for _ in range(3):
send_cmd(LIN_STOP, 0.0)
time.sleep(0.05)
print("\n정지 완료")
if __name__ == "__main__":
run_forward_stop(seconds=10.0)
실제 봇으로 정지, 멈춤, 회전, 벽 따라 돌기
url 오타로 동작 X, 내일 이어서 완성
'로보테크AI' 카테고리의 다른 글
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/30 (0) | 2026.01.30 |
|---|---|
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/29 (0) | 2026.01.29 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/27[TURTLEBOT] (1) | 2026.01.27 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/26 (0) | 2026.01.26 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/23[TCP] (1) | 2026.01.23 |