서버 수정
ㄴ입력 시간 부족으로 서버 안내랑 채팅이랑 겹침
ㄴ게임 시작 전 입장 중에 나가면 인원 수 오류
클라이언트 수정
ㄴ가독성을 위해 시스템 메시지만 붉은색으로 보이게
1. 전체 구성
실행 단위
서버 프로세스
MafiaServer.serve_forever()가 서버 소켓을 열고 accept()로 접속을 받음
접속 1명당 handle_client()를 별도 스레드로 실행
게임 페이즈(낮/밤/투표 타이머 등)는 MafiaEngine.run()이 **asyncio 루프(별도 스레드)**에서 담당
인원(8명) 충족 감시는 monitor_loop()가 asyncio로 주기 체크
클라이언트 프로세스
메인 스레드: input()으로 사용자 입력 받아 서버에 전송
수신 스레드: recv_loop()가 서버 메시지를 계속 받아서 출력(시스템 메시지는 빨간색)
통신 프로토콜
TCP 스트림 위에 UTF-8 텍스트 메시지를 그대로 주고받음
메시지 구분은 실질적으로 한 번 send한 문자열 단위를 기대
서버는 recv(buf)로 받은 데이터를 .strip()해서 명령으로 처리
2. 서버 측 데이터/상태 설계
ServerConfig (dataclass, immutable)
서버/게임 파라미터 모음
네트워크: host, port, backlog, buf, encoding
정원/시작: capacity=8, start_delay
페이즈 시간: day_chat, day_vote, revote, night 및 경고 타이밍
모니터 루프 주기: monitor_interval
GlobalState (전역 공유 상태 컨테이너)
서버 전체가 공유하는 단 하나의 상태 객체
모든 수정은 state.lock(RLock) 안에서만
주요 필드:
서버/게임 상태
capacity: 정원
started: 게임 시작 여부
phase: "lobby" | "day_chat" | "day_vote" | "night" | "ended"
night_count: 몇 번째 밤인지
shutdown: 서버 종료 플래그
세션/플레이어 매핑
conn_by_nick: {nick -> socket}
nick_by_conn: {socket -> nick}
설계 요구(닉/IP/직업 관리)
ip_by_nick: {nick -> ip}
job_by_nick: {nick -> job} (내부용, 외부 공개는 특정 규칙에서만)
ips_by_job: {job -> set(ip)} (내부용)
alive: set(nick) 생존자 집합
낮 투표
vote_token: {nick -> bool} (한 번만 투표 가능 토큰)
day_votes: {voter_nick -> target_nick}
밤 행동
night_kill_votes: {mafia_nick -> target_nick} (마피아 다수결)
night_heal: {doctor_nick -> target_nick}
기자 기능(지연 공개)
reporter_peek_target: 밤에 기자가 조사 예약한 타겟
pending_report_reveal: “다음날 아침” 전체 공개할 문구
편의 메서드:
player_count()
alive_list()
3. 서버 측 네트워크 송신 계층
SessionHub
“누구에게 어떻게 보낼지”를 담당하는 전송 유틸.
핵심 설계 포인트:
소켓 목록을 얻을 때만 state.lock으로 스냅샷을 잡고,
실제 sendall()은 락 밖에서 실행 → 송신 지연 때문에 전체 서버가 멈추는 문제 완화
메서드:
_safe_send(conn, text): send 실패(OSError) 무시
send_to(nick, text): 특정 유저에게만 전송
broadcast(text): 전체 브로드캐스트
mafia_broadcast(text): 생존 마피아에게만 전송(밤 채팅)
close_all_clients(): 종료 시 모든 클라 소켓 shutdown/close
4. asyncio를 별도 스레드로 돌리는 구조
AsyncLoopThread
서버는 기본적으로 (클라별 스레드 + accept 루프) 구조인데,
게임 엔진은 타이머 기반 페이즈 전환이므로 asyncio 코루틴으로 처리
loop = asyncio.new_event_loop()
별도 스레드에서 loop.run_forever()
create_task(coro)로 코루틴 등록(스레드 세이프하게 loop에 태스크 생성)
즉,
스레드(네트워크/클라 처리) ↔ asyncio(게임 타이머/페이즈) 가 공존
공유 데이터는 GlobalState.lock으로 동기화
5. 게임 엔진
MafiaEngine
페이즈 전환과 승리 판정, 밤/낮 처리의 단일 진실 소스
중요 원칙:
타이머/페이즈 전환은 코루틴 1개(run)에서만 수행
따라서 페이즈가 꼬일 위험을 줄임
주요 필드:
_started_once: 인원 충족 시 엔진이 중복 시작되는 것을 방지
_shutdown_cb: 게임 종료 시 서버 shutdown 호출
흐름: run()
1.정원 충족 방송 → start_delay 대기
2.대기 중 이탈로 인원 부족이면 시작 취소(엔진 리셋)
3.시작 확정:
state.started=True, phase="day_chat"
전체에 “게임 시작” 방송
_send_private_jobs()로 각자에게 직업 개인 통지
4.루프:
_check_winner() → 승자 있으면 phase ended, 방송 후 shutdown
“아침 시작” 시점에 기자 예약 공개가 있으면 _broadcast_pending_report_if_any()
_day_chat() → _day_vote() → _night() 순서 반복
각 결과를 방송(낮 처형, 밤 사망/구조 등)
낮 채팅: _day_chat()
phase="day_chat"
day_chat_warn_after 경과 시 “10초 뒤 투표” 경고
시간 만료까지 sleep 루프(0.2s)
낮 투표: _day_vote() -> Optional[str]
phase="day_vote"
day_votes 초기화, vote_token(1인 1표) 지급
제한 시간 종료 후 _tally_top()로 최다 득표자 계산
단독 1명: 즉시 alive에서 제거하고 리턴
동점: revote(재투표) 1회
재투표도 동점이면 랜덤 처형
_tally_top(allow_only=None):
생존자만 유효표로 카운트
allow_only가 있으면 그 후보들만 유효(재투표용)
밤: _night() -> (killed, saved)
phase="night", night_count += 1
밤 행동 버퍼 초기화(night_kill_votes, night_heal, reporter_peek_target)
제한 시간 후 _resolve_night()로 결과 계산
기자는 2번째 밤부터 /peek 가능
결과는 즉시 공개하지 않고 pending_report_reveal에 저장
다음 루프 초반(“아침”)에 전체에게 공개(기자가 죽어도 공개됨)
_resolve_night():
마피아 kill 투표는 다수결(동점이면 랜덤)
의사 heal도 다수결(동점이면 랜덤, 사실상 의사는 1명이므로 거의 단일)
kill_target과 heal_target이 같으면 사망 없음 + saved 반환
아니면 kill_target을 alive에서 제거
승리 판정: _check_winner()
마피아가 모두 죽음 → 시민 승
마피아 수 >= 시민 수 → 마피아 승
6. 서버
MafiaServer
서버 실행/접속/등록/이탈 처리의 중심
생성 시 준비
GlobalState, SessionHub
닉네임 풀: ANIMALS 섞어서 8명에게 배정
직업 풀: JOBS 섞어서 8명에게 배정
AsyncLoopThread + MafiaEngine 구성
stop_event로 종료 제어
serve_forever()
1.async_thread.start()
2.monitor_loop() 코루틴 1회 시작(중복 방지)
3.TCP 서버 소켓 생성:
SO_REUSEADDR
bind(host,port), listen(backlog)
settimeout(1.0)로 accept를 주기적으로 깨어나 stop_event 확인
4.accept 성공 시:
threading.Thread(target=handle_client, ...)로 클라 전용 스레드 생성
monitor_loop()
일정 주기(monitor_interval)로 상태 점검
조건:
phase=="lobby", started==False, player_count>=capacity면 engine.start_once()
즉, “8명 모이면 엔진 시작” 트리거를 여기서 담당.
_register_player(conn, addr) -> Optional[str]
정원 초과면 None
닉/직업/IP 배정 후 다음을 등록:
conn_by_nick, nick_by_conn
ip_by_nick, job_by_nick
alive.add(nick)
ips_by_job[job].add(ip)
_unregister_player_dead(conn, nick) -> bool
공통 정리:
alive.discard(nick)
conn_by_nick / nick_by_conn 매핑 제거
ips_by_job에서도 제거
게임 시작 전(lobby) 이탈:
ip_by_nick, job_by_nick 정리
닉/직업 풀에 반환(+ shuffle)
“대기실 퇴장”으로 처리(사망 아님) → False
게임 시작 후 이탈:
곧바로 alive에서 제거되었으므로 “이탈 사망 처리” → True
handle_client(conn, addr)
클라 스레드의 메인 루프
접속 직후:
1._register_player()로 즉시 등록(클라 입력 기다리지 않음)
2.매뉴얼(MANUAL) 전송
3.본인에게 닉/IP 안내
4.전체에 입장 방송
5.로비면 현재 인원 방송
메시지 처리 루프:
exit → bye 보내고 종료
/help → 매뉴얼 재전송
이후 핵심 분기(phase, 직업, 생존 여부에 따라):
1.사망자면: “관전만” 안내 후 더 이상의 명령 불가
2.낮 채팅(day_chat): 슬래시(/)로 시작 안 하면 전체 방송
포맷: nick(ip): msg
3.낮 투표(/vote 닉):
phase가 day_vote일 때만 가능
생존자에게만 투표 가능
vote_token[nick]로 1회 제한
4.밤(night)이 아니면: “명령 확인” 안내
5.밤인데 시민이면: “밤 행동 없음”
6.밤 역할 명령:
마피아 살인 /kill 닉 → night_kill_votes[mafia_nick]=target
의사 치료 /heal 닉 → night_heal[doctor_nick]=target
경찰 조사 /check 닉 → 즉시 경찰 본인에게만 직업 공개
기자 조사 /peek 닉 → 2번째 밤부터, 다음날 아침 전체 공개 예약
종료 처리(finally):
_unregister_player_dead()로 이탈 처리
결과에 따라 [퇴장] ... (이탈로 사망 처리) 또는 (대기실 퇴장) 브로드캐스트
로비면 현재 인원도 갱신 방송
소켓 close
7. 클라이언트 구조
상수
HOST, PORT, ENCODING
recv_loop(sock, stop)
별도 스레드로 동작
sock.recv(4096) 반복
서버가 끊으면 stop set
출력 색상:
[시스템], [결과], [페이즈]로 시작하면 빨간색(ANSI escape)
아니면 일반 출력
main()
소켓 connect
수신 스레드 시작
메인 스레드에서 사용자 입력을 서버로 sendall
exit 입력 시 종료
8. 서버-클라이언트 연결 동작 시퀀스
접속~게임시작
1.클라이언트 connect(HOST,PORT)
2.서버 accept → handle_client 스레드 생성
3.서버가 즉시 닉/직업/IP 배정 후 매뉴얼 + 안내 메시지 push
4.8명 모이면 monitor_loop()가 감지 → engine.start_once() 호출
5.엔진이 start_delay 후 시작 확정 → 페이즈 진행
페이즈별 권한/입력 처리
day_chat: 일반 채팅만(슬래시 명령 아닌 텍스트)
day_vote: /vote 닉만
night: 역할별 명령(/kill, /heal, /check, /peek, /m)
alive 아닌 플레이어는 모든 행동 차단(관전만)
9. 사용된 클래스/함수 요약
| 구성요소 | 역할 |
| ServerConfig | 서버/게임 파라미터 |
| GlobalState | 단일 공유 상태 + 락 |
| SessionHub | 송신(broadcast/mafia/개인) + 안전 send |
| AsyncLoopThread | asyncio 루프를 별도 스레드에서 운영 |
| MafiaEngine | 게임 페이즈, 타이머, 승리판정(코루틴 중심) |
| MafiaServer | accept/클라 스레드/등록/이탈/종료/모니터링 |
| 서버 main() | 서버 실행 엔트리 |
| 클라이언트 recv_loop | 서버 수신 전용 스레드 |
| 클라이언트 main() | connect + 입력/전송 루프 |
자율주행
TurtleBot3
https://emanual.robotis.com/docs/en/platform/turtlebot3/appendix_lds_02/
ROBOTIS e-Manual
emanual.robotis.com
모터 엔코더
https://www.stepperonline.co.kr/support/what-is-the-motor-encoder-ko
모터 인코더는 무엇입니까?
모터 엔코더는 모터에 장착된 로터리 엔코더이며 모터 샤프트의 속도 및/또는 위치를 추적하여 제어 시스템에서 애플리케이션의 모니터링에 사용되는 폐쇄 루프 피드백 신호를 제공하는 전기
www.stepperonline.co.kr
실제 로봇 주행 실습은 2주뒤로
import tkinter as tk
import socket
SERVER_IP = "192.168.9.187"
SERVER_PORT = 9999
def send_command(command):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
s.connect((SERVER_IP, SERVER_PORT))
s.sendall(command.encode("utf-8"))
except Exception as e:
print("에러", e)
root=tk.Tk()
root.title("브릿지 조종")
root.geometry("400x450")
btn_frame = tk.Frame(root)
btn_frame.pack(pady=20)
def create_btn(text, cmd, r, c):
btn = tk.Button(btn_frame, text=text, width=10, height= 2, command=lambda: send_command(cmd))
btn.grid(row=r, column=c, padx=5, pady=5)
return btn
create_btn("forward", "w", 0, 1)
create_btn("left", "a", 1, 0)
create_btn("stop", "s", 1, 1)
create_btn("right", "d", 1, 2)
create_btn("backward", "x", 2, 1)
root.bind('<w>', lambda e:send_command("w"))
root.bind('<a>', lambda e:send_command("a"))
root.bind('<s>', lambda e:send_command("s"))
root.bind('<d>', lambda e:send_command("d"))
root.bind('<x>', lambda e:send_command("x"))
root.mainloop()


로봇의 현재 좌표 x, y
로봇이 바라보는 방향 각도
목적지 좌표 x, y
8방향의 센서값 (인덱스 0~7) 장애물과의 거리
주행상태정보
#server->client 데이터 전송 구조
"""
protocol - json으로 묶어서 보낸다.
1.로봇의 현재좌표 x,y
2.로봇이 바라보는 방향 각도
3.목적지 좌표x,y
4.8방향의 센서값 (인덱스0~7) 장애물과의 거리
5.주행상태정보 finished 도착 ok 주행중
"""
import socket
import json
import time
import math
class SmartRobot:
def __init__(self,name,server_ip):
self.name = name
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address=(server_ip,9999)
self.state=None
def connect(self):
try:
self.sock.connect(self.server_address)
msg=json.dumps({"action":"set_name","name":self.name})
self.sock.sendall((msg+"\n").encode())
print("연결완료")
return True
except:
print("연결실패")
return False
def send_drive(self,l_speed,r_speed):
try:
msg=json.dumps({"action":"drive","l_dist":l_speed,"r_dist":r_speed})
self.sock.sendall((msg+"\n").encode())
response=self.sock.recv(2048).decode().strip()
if response:
self.state=json.loads(response.split('\n')[-1])
print(self.state)
return True
except:
return False
return False
def run(self):
if not self.connect():return
self.send_drive(0,0)
while self.state:
if self.state.get('result')=='finished':
print("등록 완료")
break
############################
#알고리즘 작성
#self.state['current_pos_x']:로봇 현재 x위치
#self.state['current_pos_y']:로봇 현재 y위치
#self.state['current_theta']:현재 로봇 방향(각)
#self.state['goal_x'] : 목적지 x
#self.state['goal_y'] : 목적지 y
#self.state['sensor'] : 8방향 거리 센서 리스트 (인덱스 0~7)
sensors=self.state['sensor']
front_dist=sensors[0]
l_speed=4
r_speed=4
if sensors[0]<25:
l_speed=3
r_speed=-3
elif sensors[1]<20 or sensors[2]<20:
l_speed=4
r_speed=2
elif sensors[7]<20 or sensors[6]<20:
l_speed=2
r_speed=4
#1.양옆 센서 값 기반으로 더 넓은 곳 찾기
#2.목적지 좌표 활용 arctan
#3.라이다 8방향 값 기반 장애물 감지
#4.유클리드 거리
#############################
if not self.send_drive(l_speed,r_speed): break
time.sleep(0.05)
self.sock.close()
if __name__ == "__main__":
team_name='abc'
ser_ip="192.168.0.187"
SmartRobot(team_name,ser_ip).run()
'로보테크AI' 카테고리의 다른 글
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/29 (0) | 2026.01.29 |
|---|---|
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/28 (1) | 2026.01.28 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/26 (0) | 2026.01.26 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/23[TCP] (1) | 2026.01.23 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/22 (0) | 2026.01.22 |