로보테크AI

융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/27[TURTLEBOT]

steezer 2026. 1. 27. 18:30

서버 수정

ㄴ입력 시간 부족으로 서버 안내랑 채팅이랑 겹침

ㄴ게임 시작 전 입장 중에 나가면 인원 수 오류

 

클라이언트 수정

ㄴ가독성을 위해 시스템 메시지만 붉은색으로 보이게

 

client.py
0.00MB
server.py
0.03MB
Rule.txt
0.00MB


1. 전체 구성

실행 단위
    서버 프로세스
        MafiaServer.serve_forever()가 서버 소켓을 열고 accept()로 접속을 받음
        접속 1명당 handle_client()를 별도 스레드로 실행
        게임 페이즈(낮/밤/투표 타이머 등)는 MafiaEngine.run()이 **asyncio 루프(별도 스레드)**에서 담당
        인원(8명) 충족 감시는 monitor_loop()가 asyncio로 주기 체크


    클라이언트 프로세스
        메인 스레드: input()으로 사용자 입력 받아 서버에 전송
        수신 스레드: recv_loop()가 서버 메시지를 계속 받아서 출력(시스템 메시지는 빨간색)

 

통신 프로토콜

    TCP 스트림 위에 UTF-8 텍스트 메시지를 그대로 주고받음

    메시지 구분은 실질적으로 한 번 send한 문자열 단위를 기대
    서버는 recv(buf)로 받은 데이터를 .strip()해서 명령으로 처리

 

2. 서버 측 데이터/상태 설계

ServerConfig (dataclass, immutable)
서버/게임 파라미터 모음
    네트워크: host, port, backlog, buf, encoding
    정원/시작: capacity=8, start_delay
    페이즈 시간: day_chat, day_vote, revote, night 및 경고 타이밍
    모니터 루프 주기: monitor_interval

GlobalState (전역 공유 상태 컨테이너)
서버 전체가 공유하는 단 하나의 상태 객체

모든 수정은 state.lock(RLock) 안에서만

 

주요 필드:
    서버/게임 상태
         capacity: 정원
         started: 게임 시작 여부
         phase: "lobby" | "day_chat" | "day_vote" | "night" | "ended"
         night_count: 몇 번째 밤인지
         shutdown: 서버 종료 플래그

    세션/플레이어 매핑
         conn_by_nick: {nick -> socket}
         nick_by_conn: {socket -> nick}

    설계 요구(닉/IP/직업 관리)
         ip_by_nick: {nick -> ip}
         job_by_nick: {nick -> job} (내부용, 외부 공개는 특정 규칙에서만)
         ips_by_job: {job -> set(ip)} (내부용)
         alive: set(nick) 생존자 집합

    낮 투표
         vote_token: {nick -> bool} (한 번만 투표 가능 토큰)
         day_votes: {voter_nick -> target_nick}

    밤 행동
         night_kill_votes: {mafia_nick -> target_nick} (마피아 다수결)
         night_heal: {doctor_nick -> target_nick}

    기자 기능(지연 공개)
         reporter_peek_target: 밤에 기자가 조사 예약한 타겟
         pending_report_reveal: “다음날 아침” 전체 공개할 문구

    편의 메서드:
         player_count()
         alive_list()

 

3. 서버 측 네트워크 송신 계층

SessionHub
“누구에게 어떻게 보낼지”를 담당하는 전송 유틸.

핵심 설계 포인트:
    소켓 목록을 얻을 때만 state.lock으로 스냅샷을 잡고,
     실제 sendall()은 락 밖에서 실행 → 송신 지연 때문에 전체 서버가 멈추는 문제 완화

메서드:
     _safe_send(conn, text): send 실패(OSError) 무시
     send_to(nick, text): 특정 유저에게만 전송
     broadcast(text): 전체 브로드캐스트
     mafia_broadcast(text): 생존 마피아에게만 전송(밤 채팅)
     close_all_clients(): 종료 시 모든 클라 소켓 shutdown/close

 

4. asyncio를 별도 스레드로 돌리는 구조

AsyncLoopThread
서버는 기본적으로 (클라별 스레드 + accept 루프) 구조인데,
게임 엔진은 타이머 기반 페이즈 전환이므로 asyncio 코루틴으로 처리

    loop = asyncio.new_event_loop()
    별도 스레드에서 loop.run_forever()
    create_task(coro)로 코루틴 등록(스레드 세이프하게 loop에 태스크 생성)

즉,

    스레드(네트워크/클라 처리) ↔ asyncio(게임 타이머/페이즈) 가 공존

    공유 데이터는 GlobalState.lock으로 동기화

 

5. 게임 엔진

MafiaEngine
페이즈 전환과 승리 판정, 밤/낮 처리의 단일 진실 소스

 

중요 원칙:

    타이머/페이즈 전환은 코루틴 1개(run)에서만 수행

    따라서 페이즈가 꼬일 위험을 줄임

 

주요 필드:
    _started_once: 인원 충족 시 엔진이 중복 시작되는 것을 방지
    _shutdown_cb: 게임 종료 시 서버 shutdown 호출

 

흐름: run()
    1.정원 충족 방송 → start_delay 대기


    2.대기 중 이탈로 인원 부족이면 시작 취소(엔진 리셋)


    3.시작 확정:
        state.started=True, phase="day_chat"
        전체에 “게임 시작” 방송
        _send_private_jobs()로 각자에게 직업 개인 통지


    4.루프:
        _check_winner() → 승자 있으면 phase ended, 방송 후 shutdown
         “아침 시작” 시점에 기자 예약 공개가 있으면 _broadcast_pending_report_if_any()
         _day_chat() → _day_vote() → _night() 순서 반복
         각 결과를 방송(낮 처형, 밤 사망/구조 등)


낮 채팅: _day_chat()
    phase="day_chat"
     day_chat_warn_after 경과 시 “10초 뒤 투표” 경고
     시간 만료까지 sleep 루프(0.2s)


낮 투표: _day_vote() -> Optional[str]
     phase="day_vote"
     day_votes 초기화, vote_token(1인 1표) 지급
     제한 시간 종료 후 _tally_top()로 최다 득표자 계산
          단독 1명: 즉시 alive에서 제거하고 리턴
          동점: revote(재투표) 1회
               재투표도 동점이면 랜덤 처형


_tally_top(allow_only=None):
     생존자만 유효표로 카운트
     allow_only가 있으면 그 후보들만 유효(재투표용)

 

밤: _night() -> (killed, saved)
     phase="night", night_count += 1
     밤 행동 버퍼 초기화(night_kill_votes, night_heal, reporter_peek_target)
     제한 시간 후 _resolve_night()로 결과 계산
     기자는 2번째 밤부터 /peek 가능
          결과는 즉시 공개하지 않고 pending_report_reveal에 저장
          다음 루프 초반(“아침”)에 전체에게 공개(기자가 죽어도 공개됨)

 

_resolve_night():
     마피아 kill 투표는 다수결(동점이면 랜덤)
     의사 heal도 다수결(동점이면 랜덤, 사실상 의사는 1명이므로 거의 단일)
     kill_target과 heal_target이 같으면 사망 없음 + saved 반환
     아니면 kill_target을 alive에서 제거

 

승리 판정: _check_winner()
     마피아가 모두 죽음 → 시민 승
     마피아 수 >= 시민 수 → 마피아 승

 

6. 서버

MafiaServer
서버 실행/접속/등록/이탈 처리의 중심

생성 시 준비
    GlobalState, SessionHub
    닉네임 풀: ANIMALS 섞어서 8명에게 배정
    직업 풀: JOBS 섞어서 8명에게 배정
    AsyncLoopThread + MafiaEngine 구성
    stop_event로 종료 제어

serve_forever()
    1.async_thread.start()
    2.monitor_loop() 코루틴 1회 시작(중복 방지)
    3.TCP 서버 소켓 생성:
        SO_REUSEADDR
        bind(host,port), listen(backlog)
        settimeout(1.0)로 accept를 주기적으로 깨어나 stop_event 확인
    4.accept 성공 시:
        threading.Thread(target=handle_client, ...)로 클라 전용 스레드 생성

monitor_loop()
    일정 주기(monitor_interval)로 상태 점검
    조건:
        phase=="lobby", started==False, player_count>=capacity면 engine.start_once()
즉, “8명 모이면 엔진 시작” 트리거를 여기서 담당.

_register_player(conn, addr) -> Optional[str]
    정원 초과면 None
    닉/직업/IP 배정 후 다음을 등록:
        conn_by_nick, nick_by_conn
        ip_by_nick, job_by_nick
        alive.add(nick)
        ips_by_job[job].add(ip)

_unregister_player_dead(conn, nick) -> bool
    공통 정리:
        alive.discard(nick)
        conn_by_nick / nick_by_conn 매핑 제거
        ips_by_job에서도 제거

    게임 시작 전(lobby) 이탈:
        ip_by_nick, job_by_nick 정리
        닉/직업 풀에 반환(+ shuffle)
        “대기실 퇴장”으로 처리(사망 아님) → False

    게임 시작 후 이탈:
        곧바로 alive에서 제거되었으므로 “이탈 사망 처리” → True

handle_client(conn, addr)
클라 스레드의 메인 루프

접속 직후:
    1._register_player()로 즉시 등록(클라 입력 기다리지 않음)
    2.매뉴얼(MANUAL) 전송
    3.본인에게 닉/IP 안내
    4.전체에 입장 방송
    5.로비면 현재 인원 방송

메시지 처리 루프:
    exit → bye 보내고 종료
    /help → 매뉴얼 재전송

이후 핵심 분기(phase, 직업, 생존 여부에 따라):
1.사망자면: “관전만” 안내 후 더 이상의 명령 불가
2.낮 채팅(day_chat): 슬래시(/)로 시작 안 하면 전체 방송
    포맷: nick(ip): msg
3.낮 투표(/vote 닉):
    phase가 day_vote일 때만 가능
    생존자에게만 투표 가능
    vote_token[nick]로 1회 제한
4.밤(night)이 아니면: “명령 확인” 안내
5.밤인데 시민이면: “밤 행동 없음”
6.밤 역할 명령:
    마피아 살인 /kill 닉 → night_kill_votes[mafia_nick]=target
    의사 치료 /heal 닉 → night_heal[doctor_nick]=target
    경찰 조사 /check 닉 → 즉시 경찰 본인에게만 직업 공개
    기자 조사 /peek 닉 → 2번째 밤부터, 다음날 아침 전체 공개 예약
종료 처리(finally):
    _unregister_player_dead()로 이탈 처리
    결과에 따라 [퇴장] ... (이탈로 사망 처리) 또는 (대기실 퇴장) 브로드캐스트
    로비면 현재 인원도 갱신 방송
    소켓 close

 

7. 클라이언트 구조

상수
    HOST, PORT, ENCODING

recv_loop(sock, stop)
    별도 스레드로 동작
    sock.recv(4096) 반복
    서버가 끊으면 stop set
    출력 색상:
        [시스템], [결과], [페이즈]로 시작하면 빨간색(ANSI escape)
        아니면 일반 출력

main()
    소켓 connect
    수신 스레드 시작
    메인 스레드에서 사용자 입력을 서버로 sendall
    exit 입력 시 종료

 

8. 서버-클라이언트 연결 동작 시퀀스

접속~게임시작
1.클라이언트 connect(HOST,PORT)
2.서버 accept → handle_client 스레드 생성
3.서버가 즉시 닉/직업/IP 배정 후 매뉴얼 + 안내 메시지 push
4.8명 모이면 monitor_loop()가 감지 → engine.start_once() 호출
5.엔진이 start_delay 후 시작 확정 → 페이즈 진행

페이즈별 권한/입력 처리
    day_chat: 일반 채팅만(슬래시 명령 아닌 텍스트)
    day_vote: /vote 닉만
    night: 역할별 명령(/kill, /heal, /check, /peek, /m)
    alive 아닌 플레이어는 모든 행동 차단(관전만)

 

9. 사용된 클래스/함수 요약

구성요소 역할
ServerConfig 서버/게임 파라미터
GlobalState 단일 공유 상태 + 락
SessionHub 송신(broadcast/mafia/개인) + 안전 send
AsyncLoopThread asyncio 루프를 별도 스레드에서 운영
MafiaEngine 게임 페이즈, 타이머, 승리판정(코루틴 중심)
MafiaServer accept/클라 스레드/등록/이탈/종료/모니터링
서버 main() 서버 실행 엔트리
클라이언트 recv_loop 서버 수신 전용 스레드
클라이언트 main() connect + 입력/전송 루프

 


자율주행

TurtleBot3

https://emanual.robotis.com/docs/en/platform/turtlebot3/appendix_lds_02/

 

ROBOTIS e-Manual

 

emanual.robotis.com

 

모터 엔코더

https://www.stepperonline.co.kr/support/what-is-the-motor-encoder-ko

 

모터 인코더는 무엇입니까?

모터 엔코더는 모터에 장착된 로터리 엔코더이며 모터 샤프트의 속도 및/또는 위치를 추적하여 제어 시스템에서 애플리케이션의 모니터링에 사용되는 폐쇄 루프 피드백 신호를 제공하는 전기

www.stepperonline.co.kr

실제 로봇 주행 실습은 2주뒤로

 

import tkinter as tk
import socket
SERVER_IP = "192.168.9.187"
SERVER_PORT = 9999
def send_command(command):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(0.5)
            s.connect((SERVER_IP, SERVER_PORT))
            s.sendall(command.encode("utf-8"))
    except Exception as e:
        print("에러", e)
root=tk.Tk()
root.title("브릿지 조종")
root.geometry("400x450")
btn_frame = tk.Frame(root)
btn_frame.pack(pady=20)
def create_btn(text, cmd, r, c):
    btn = tk.Button(btn_frame, text=text, width=10, height= 2, command=lambda: send_command(cmd))
    btn.grid(row=r, column=c, padx=5, pady=5)
    return btn
create_btn("forward", "w", 0, 1)
create_btn("left", "a", 1, 0)
create_btn("stop", "s", 1, 1)
create_btn("right", "d", 1, 2)
create_btn("backward", "x", 2, 1)

root.bind('<w>', lambda e:send_command("w"))
root.bind('<a>', lambda e:send_command("a"))
root.bind('<s>', lambda e:send_command("s"))
root.bind('<d>', lambda e:send_command("d"))
root.bind('<x>', lambda e:send_command("x"))
root.mainloop()

 

로봇의 현재 좌표 x, y

로봇이 바라보는 방향 각도

목적지 좌표 x, y

8방향의 센서값 (인덱스 0~7) 장애물과의 거리

주행상태정보

#server->client 데이터 전송 구조
"""
protocol - json으로 묶어서 보낸다.
1.로봇의 현재좌표 x,y
2.로봇이 바라보는 방향 각도
3.목적지 좌표x,y
4.8방향의 센서값 (인덱스0~7) 장애물과의 거리
5.주행상태정보 finished 도착 ok 주행중
"""

import socket
import json
import time
import math

class SmartRobot:
    def __init__(self,name,server_ip):
        self.name = name
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_address=(server_ip,9999)
        self.state=None
    def connect(self):
        try:
            self.sock.connect(self.server_address)
            msg=json.dumps({"action":"set_name","name":self.name})
            self.sock.sendall((msg+"\n").encode())
            print("연결완료")
            return True
        except:
            print("연결실패")
            return False

    def send_drive(self,l_speed,r_speed):
        try:
            msg=json.dumps({"action":"drive","l_dist":l_speed,"r_dist":r_speed})
            self.sock.sendall((msg+"\n").encode())
            response=self.sock.recv(2048).decode().strip()
            if response:
                self.state=json.loads(response.split('\n')[-1])
                print(self.state)
                return True
        except:
            return False
        return False

    def run(self):
        if not self.connect():return
        self.send_drive(0,0)
        while self.state:
            if self.state.get('result')=='finished':
                print("등록 완료")
                break
            ############################
            #알고리즘 작성
            #self.state['current_pos_x']:로봇 현재 x위치
            #self.state['current_pos_y']:로봇 현재 y위치
            #self.state['current_theta']:현재 로봇 방향(각)
            #self.state['goal_x'] : 목적지 x
            #self.state['goal_y'] : 목적지 y
            #self.state['sensor'] : 8방향 거리 센서 리스트 (인덱스 0~7)
            sensors=self.state['sensor']
            front_dist=sensors[0]

            l_speed=4
            r_speed=4
            if sensors[0]<25:
                l_speed=3
                r_speed=-3
            elif sensors[1]<20 or sensors[2]<20:
                l_speed=4
                r_speed=2
            elif sensors[7]<20 or sensors[6]<20:
                l_speed=2
                r_speed=4
            #1.양옆 센서 값 기반으로 더 넓은 곳 찾기
            #2.목적지 좌표 활용 arctan
            #3.라이다 8방향 값 기반 장애물 감지
            #4.유클리드 거리
            #############################

            if not self.send_drive(l_speed,r_speed): break
            time.sleep(0.05)
        self.sock.close()
if __name__ == "__main__":
    team_name='abc'
    ser_ip="192.168.0.187"
    SmartRobot(team_name,ser_ip).run()