로보테크AI

융합_로보테크 AI 자율주행 로봇 개발자 과정-26/01/23[TCP]

steezer 2026. 1. 23. 18:30

TCP

Transmission Control Protocol

인터넷에서 데이터를 안정적이고 순서대로 주고받기 위한 핵심 통신규약(프로토콜)

연결설정: Three Way Handshake

ㄴ1)상대에게 통신을 하고 싶다는 메시지를 보낸다(SYN)

ㄴ2)상대는 그 메시지에 대한 응답 + 나도 통신 준비가 되었다는 메시지를 보낸다(SYN-ACK)

ㄴ3)2번에서 받은 메시지에 응답한다(ACK)

 

UDP

TCP에 비해 데이터가 누락되지만 속도가 빠름

 

소켓

떨어져 있는 두 호스트를 연결해주는 도구로서 인터페이스의 역할을 함

프로세스가 네트워크로 데이터를 내보내거나 혹은 그로부터 데이터를 가져오는 창구 역할

ㄴ프로토콜: 어떤 시스템이 다른 시스템과 통신을 원홯하게 수용하도록 해주는 통신 규약

ㄴIP: 전 세계 컴퓨터에 부여된 고유 식별 주소

ㄴ포트: 네트워크 상에서 통신하기 위해 호스트 내부적으로 프로세스가 할당받아야하는 고유 숫자

서버

클라이언트 소켓의 연결 요청을 대기하고, 연결 요청이 오면 클라이언트 소켓을 생성하여 통신이 가능하게 한다

클라이언트

실제로 데이터 송수신이 일어나는 클라이언트 소켓

소켓 종류

스트림(TCP)

데이터그램(UDP)

# TCP 핵심 특징
# 1. 연결지향(3way handshake)
# syn 데이터 보내도 되는지 체크
# syn-ack 데이터 받을 준비 되었음 알림
# ack 데이터 전송 시작
#
# 2. 순서 보장
# 시퀀스 넘버 : 패킷에 번호를 부여해서, 수신측에서 도착한 데이터를 순서대로 풀 수 있도록 함
#
# 확인 응답 ack : 데이터를 받았다는 응답, 이 응답이 오지 않으면 데이터 유실로 판단하고 재전송 함
#
# 3. 흐름제어
# 받는 쪽 처리 속도가 느리면 보내는 속도를 줄인다.
#
# 4. 전이중 통신
# 데이터가 양방향으로 동시에 흐를 수 있다.
#
# TCP vs UDP
# TCP는 신뢰성 확인 과정으로 => 속도가 느리다, 데이터 순서 보장
# UDP는 실시간 강조한느 환경에 적합, 확인과정이 없어 빠르다
class MySocket:
    """시연용 클래스
      - 효율성이 아니라 명확성을 위해 코드 되었습니다
    """

    def __init__(self, sock=None):
        if sock is None:
            self.sock = socket.socket(
                            socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.sock = sock

    def connect(self, host, port):
        self.sock.connect((host, port))

    def mysend(self, msg):
        totalsent = 0
        while totalsent < MSGLEN:
            sent = self.sock.send(msg[totalsent:])
            if sent == 0:
                raise RuntimeError("socket connection broken")
            totalsent = totalsent + sent

    def myreceive(self):
        chunks = []
        bytes_recd = 0
        while bytes_recd < MSGLEN:
            chunk = self.sock.recv(min(MSGLEN - bytes_recd, 2048))
            if chunk == b'':
                raise RuntimeError("socket connection broken")
            chunks.append(chunk)
            bytes_recd = bytes_recd + len(chunk)
        return b''.join(chunks)

 

https://docs.python.org/ko/3.13/howto/sockets.html

 

Socket Programming HOWTO

Author, Gordon McMillan,. Abstract: Sockets are used nearly everywhere, but are one of the most severely misunderstood technologies around. This is a 10,000 foot overview of sockets. It’s not reall...

docs.python.org

https://docs.python.org/release/3.9.16/library/socket.html?highlight=socket#module-socket

 

socket — Low-level networking interface — Python 3.9.16 documentation

socket — Low-level networking interface Source code: Lib/socket.py This module provides access to the BSD socket interface. It is available on all modern Unix systems, Windows, MacOS, and probably additional platforms. Note Some behavior may be platform

docs.python.org

servertest.py

# Echo server program
import socket

HOST = '127.0.0.1'                 # Symbolic name meaning all available interfaces
PORT = 50007              # Arbitrary non-privileged port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, PORT))
    s.listen(1)
    conn, addr = s.accept()
    with conn:
        print('Connected by', addr)
        while True:
            data = conn.recv(1024)
            print(data)
            print(type(data))
            if not data: break
            conn.sendall(data+b' [check]')

 

Connected by ('127.0.0.1', 65510)
b'Hello, world'
<class 'bytes'>
b''
<class 'bytes'>

 

clienttest.py

# Echo client program
import socket

HOST = '127.0.0.1'    # The remote host
PORT = 50007              # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    s.sendall(b'Hello, world')
    data = s.recv(1024)
    print(data)
print('Received', repr(data))

 

b'Hello, world [check]'
Received b'Hello, world [check]'

HOST 주소 바꾸면 양쪽에서 sever와 client 연결되서 동시 실행

 

서버, 클라이언트 대화 주고받기

server

import socket
import random
from datetime import date

HOST = '192.168.0.204'
PORT = 50007


def generate_reply(message: str) -> str:
    text = message.strip()

    if "날씨" in text:
        return random.choice(["맑습니다.", "흐립니다."])
    if "안녕" in text:
        return "반갑습니다."
    if "날짜" in text:
        today = date.today()
        return today.strftime("%Y.%m.%d입니다.")
    if "위치" in text:
        return "대전입니다."
    if text == "exit":
        return "연결을 종료합니다."

    return "무슨 말인지 잘 모르겠습니다."


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
    server_socket.bind((HOST, PORT))
    server_socket.listen(1)

    conn, addr = server_socket.accept()
    with conn:
        print("Connected by", addr)

        while True:
            data = conn.recv(1024)
            if not data:
                break

            message = data.decode("utf-8")
            print("Client:", message)

            reply = generate_reply(message)
            conn.sendall(reply.encode("utf-8"))

            if message.strip() == "exit":
                break

 

client

import socket

HOST = '192.168.0.204'
PORT = 50007

print("입력: 안녕, 날씨, 날짜, 위치 / 종료: exit")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
    client_socket.connect((HOST, PORT))

    while True:
        message = input("Client: ").strip()
        client_socket.sendall(message.encode("utf-8"))

        data = client_socket.recv(1024)
        reply = data.decode("utf-8")
        print("Server:", reply)

        if message == "exit":
            break

입력: 안녕, 날씨, 날짜, 위치 / 종료: exit

Client: 안녕
Server: 반갑습니다.
Client: 오늘 날씨 알려줘.
Server: 맑습니다.
Client: 오늘 날짜는?
Server: 2026.01.23입니다.
Client: 현재 위치는?
Server: 대전입니다.
Client: aexita
Server: 무슨 말인지 잘 모르겠습니다.
Client: exit
Server: 연결을 종료합니다.

종료 코드 0(으)로 완료된 프로세스

 

바이트 데이터를 문자열로 변환: 디코딩

data.decode('utf-8')

메세지를 보낼 때 문자열로 인코딩해서 전송

s.sendall("안녕하세요".encode('utf-8'))


import socket
from datetime import datetime

HOST = '192.168.0.204'
PORT = 50007
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, PORT))
    s.listen(1)
    print("서버 실행")
    conn, addr = s.accept()
    with conn:
        print(f"{addr} 연결됨")
        while True:
            data = conn.recv(1024)
            if not data: break
            request = data.decode('utf-8').strip()
            if "시간" in request:
                now=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                response=f"현재 서버 시간 : {now}"
            elif "안녕" in request:
                response="안녕하세요"
            else:
                response="이해할 수 없는 명령어 입니다."
            conn.sendall(response.encode('utf-8'))
import socket

HOST = '192.168.0.204'
PORT = 50007
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    print("서버 연결됨")
    while True:
        msg=input("서버에 보낼 메시지를 입력하세요: ")
        if not msg: continue
        s.sendall(msg.encode('utf-8'))
        data = s.recv(1024)
        print(f"서버로부터 받은 답장 : {data.decode('utf-8')}")

서버 실행
('192.168.0.204', 52933) 연결됨

 

서버 연결됨
서버에 보낼 메시지를 입력하세요: 123
서버로부터 받은 답장 : 이해할 수 없는 명령어 입니다.
서버에 보낼 메시지를 입력하세요: 시간
서버로부터 받은 답장 : 현재 서버 시간 : 2026-01-23 12:25:59
서버에 보낼 메시지를 입력하세요: 안녕
서버로부터 받은 답장 : 안녕하세요

 

멀티스레드

# 순차실행
# 동시실행
# 동기
# 비동기
# 멀티스레드
# 멀티프로세스

import time
import threading
def task(name):
    print(f"{name}시작")
    time.sleep(2)
    print(f"{name}완료")

thread1=threading.Thread(target=task, args=("1번 스레드",))
thread2=threading.Thread(target=task, args=("2번 스레드",))
thread1.start() # 스레드 작업 시작
thread2.start()
thread1.join() # 작업 끝날 때까지 메인 프로그램은 대기
thread2.join()
print("모든 작업 완료")

1번 스레드시작
2번 스레드시작
1번 스레드완료
2번 스레드완료
모든 작업 완료

 

import time
import threading
import random
def task(name):
    print(f"{name}시작")
    time.sleep(random.randint(1,3))
    print(f"{name}완료")

thread1=threading.Thread(target=task, args=("1번 스레드",))
thread2=threading.Thread(target=task, args=("2번 스레드",))
thread1.start() # 스레드 작업 시작
thread2.start()
thread1.join() # 작업 끝날 때까지 메인 프로그램은 대기
print("1번 작업 완료")
thread2.join()
print("모든 작업 완료")

1번 스레드시작
2번 스레드시작
2번 스레드완료
1번 스레드완료
1번 작업 완료
모든 작업 완료

 

동기: 요청을 보내고 응답이 올때까지 기다렸다가 다음 작업을 순차적으로 처리하는 방식

비동기: 요청 후 응답을 기다리지 않고 다른 작업을 먼저 처리하다가 결과가 오면 나중에 처리하는 방식

 

import threading
import time

count=0 # 공유 데이터

def increment():
    global count
    for _ in range(100):
        temp=count
        time.sleep(0.0001)
        count=temp+1
threads=[]
for i in range(10):
    t=threading.Thread(target=increment)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print("최종",count)

최종: 103

 

# thread 에서 lock 객체

import threading
import time
count=0

lock = threading.Lock()
def increment():
    global count
    for _ in range(100):
        lock.acquire()
        try:
            temp=count
            time.sleep(0.0001)
            count=temp+1
        finally:
            lock.release()

threads=[]
for i in range(10):
    t=threading.Thread(target=increment)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print("최종",count)

최종 1000

 

lock.acquire() 잠금=> 진입 스레드가 작업을 독점하고 타 스레드들은 이 구간 앞에서 대기

lock.release() 잠금 해제

 

동기 => 비효율적/순차적
비동기 => 병렬//효율적

 

import time
def delivery(name):
    print(f"{name} 시작")
    time.sleep(1)
    print(f"{name} 완료")

start=time.time()
delivery("111")
delivery("222")
print(f"{time.time() - start}")

111 시작
111 완료
222 시작
222 완료
2.0012545585632324

 

import asyncio
import time

async def delivery_async(name):
    print(f"{name} 시작")
    await asyncio.sleep(2)
    print(f"{name} 완료")

async def main():
    start = time.time()
    await asyncio.gather(
        delivery_async("1111"),
        delivery_async("2222")
    )
    print(f"{time.time()-start}")

asyncio.run(main())

1111 시작
2222 시작
1111 완료
2222 완료
2.0015242099761963

 

멀티스레드 동시 접속 서버

import socket
import threading
from dataclasses import dataclass
from datetime import datetime
from typing import Tuple

@dataclass(frozen=True)
class ServerConfig:
    # 서버 동작에 필요한 설정 값을 한 곳에 모아두는 설정 객체
    host: str = "192.168.0.204"   # 서버가 바인딩할 IP (보통 0.0.0.0으로 하면 모든 인터페이스에서 수신 가능)
    port: int = 50007            # 서버가 수신할 TCP 포트
    backlog: int = 50            # listen 대기 큐 크기 (동시에 들어오는 연결 요청을 얼마나 큐잉할지)
    buffer_size: int = 1024      # recv로 한 번에 읽을 최대 바이트 수
    encoding: str = "utf-8"      # 요청/응답 문자열 인코딩


class RequestHandler:
    # '요청(request) -> 응답(response)'을 만드는 도메인 로직 담당 클래스
    def __init__(self, encoding: str) -> None:
        self._encoding = encoding

    def handle(self, request_bytes: bytes) -> bytes:
        # 클라이언트가 보낸 raw bytes를 받아서, 응답 bytes를 만들어 반환
        # 네트워크에서 받은 바이트 데이터를 문자열로 해석
        # errors="replace": 인코딩 오류가 나면 예외를 터뜨리지 말고 대체 문자로 치환해 서비스 지속성을 높임
        request = request_bytes.decode(self._encoding, errors="replace").strip()

        # 규칙 기반 응답(키워드 포함 여부)
        if "시간" in request:
            now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            response = f"현재 서버 시간 : {now}"
            return response.encode(self._encoding)

        if "안녕" in request:
            return "안녕하세요".encode(self._encoding)

        # 위 조건에 걸리지 않으면 기본 응답
        return "이해할 수 없는 명령어 입니다.".encode(self._encoding)

class ClientSession:
    #하나의 클라이언트 연결(소켓) 단위를 처리
    def __init__(
        self,
        conn: socket.socket,
        addr: Tuple[str, int],
        config: ServerConfig,
        handler: RequestHandler,
    ) -> None:
        self._conn = conn          # 이 클라이언트와 통신하는 전용 소켓
        self._addr = addr          # (ip, port) 형태의 클라이언트 주소
        self._config = config      # 서버 설정(버퍼 크기, 인코딩 등)
        self._handler = handler    # 요청->응답 로직

    def run(self) -> None:
        """
        이 연결에서 발생하는 통신을 끝날 때까지 처리

        흐름
         recv로 요청 수신
         handler로 응답 생성
         sendall로 응답 전송
         클라이언트가 연결을 끊거나 예외가 나면 종료
        """
        try:
            print(f"{self._addr} 연결됨")

            while True:
                # TCP는 스트림이므로 '메시지 단위'를 보장하지 않는다.
                # 여기서는 간단한 예제로 "recv 한번당 하나의 요청"처럼 취급하지만,
                # 실제 서비스에서는 줄(\n) 단위 프로토콜이나 길이 프레이밍 등을 써야 안정적이다.
                data = self._conn.recv(self._config.buffer_size)

                # data가 비어 있으면: 클라이언트가 정상적으로 소켓을 닫았다는 의미
                if not data:
                    return

                # 네트워크 로직과 분리된 handler가 응답 bytes를 만든다.
                response = self._handler.handle(data)

                # sendall: 내부적으로 모든 바이트가 전송될 때까지 반복 전송한다.
                self._conn.sendall(response)

        except (ConnectionResetError, BrokenPipeError):
            # 클라이언트가 강제로 종료했거나,
            # 전송 중 파이프가 끊긴 경우 등 운영에서 자주 볼 수 있는 예외들
            return
        except OSError:
            # 기타 소켓/OS 레벨 예외.
            # 운영용 코드에서는 여기서 로깅을 남기는 편이 좋다.
            return
        finally:
            # 어떤 경로로든 run()이 끝나면 반드시 소켓을 정리한다.
            # 소켓을 닫지 않으면 파일 디스크립터 누수로 서버가 망가질 수 있다.
            try:
                self._conn.close()
            except OSError:
                pass

            print(f"{self._addr} 연결 종료")

class ThreadedTcpServer:
    """
    멀티스레드 TCP 서버 본체
     서버 소켓을 열고(bind/listen)
     accept로 연결을 받으면
     연결마다 스레드를 하나 생성해 ClientSession.run()을 실행
    """
    def __init__(self, config: ServerConfig) -> None:
        self._config = config
        # 모든 연결에서 동일한 규칙을 쓰므로 handler는 서버에서 1개만 생성해 공유한다.
        # (현재 handler는 내부 상태가 없어 thread-safe)
        self._handler = RequestHandler(config.encoding)

    def serve_forever(self) -> None:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_sock:
            # 서버 재시작 시 "Address already in use"를 줄여주는 옵션.
            # 개발/운영 모두에서 유용하다.
            server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

            # 지정한 IP/PORT로 서버 소켓을 바인딩(해당 주소로 오는 연결을 받겠다는 의미)
            server_sock.bind((self._config.host, self._config.port))

            # listen: 이 시점부터 accept 가능한 "서버" 상태가 된다.
            server_sock.listen(self._config.backlog)

            print("서버 실행")

            while True:
                # 새 연결을 받을 때까지 블로킹된다.
                conn, addr = server_sock.accept()

                # 새 연결마다 독립 세션을 만들고
                session = ClientSession(conn, addr, self._config, self._handler)

                # 세션 처리를 전담할 스레드 생성
                # daemon=True:
                # - 메인 스레드가 종료되면(예: Ctrl+C) 데몬 스레드도 함께 종료된다.
                # - 개발 편의성은 좋지만, 운영에서는 graceful shutdown이 더 바람직할 수 있다.
                t = threading.Thread(target=session.run, daemon=True)
                t.start()

def main() -> None:
    # 설정을 만들고 서버 시작
    ThreadedTcpServer(ServerConfig()).serve_forever()

if __name__ == "__main__":
    main()

('192.168.0.133', 60426) 연결됨
('192.168.0.204', 54418) 연결됨
('192.168.0.133', 60426) 연결 종료
('192.168.0.204', 54418) 연결 종료

 

import socket
from datetime import datetime
import threading
from http.client import responses


def handle_client(conn, addr):
    print(f"{addr}접속")
    with conn:
        try:
            while True:
                data = conn.recv(1024)
                if not data: break
                request = data.decode('utf-8').strip()
                print(f"{addr}요청 : {request}")

                if "시간" in request:
                    now=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    response=f"현재 시간{now}"
                elif "안녕" in request:
                    response="안녕하세요"
                else:
                    response="이해 불가 명령어 입니다."
                conn.sendall(response.encode('utf-8'))
        except ConnectionResetError:
            pass
        print(f"{addr}연결 끊김")

HOST="192.168.0.204"
POST=50007

server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind((HOST,POST))
server.listen(5) # 대기열 5
print("서버 실행")

while True:
    conn, addr = server.accept()
    client_thread = threading.Thread(target=handle_client, args=(conn, addr))
    client_thread.start()
    print(f"현 접속 인원{threading.active_count()-1}")

정원 수 제한

#MAX_CLIENT=3 # 스레드 수 제한방식
'''
current_count=threading.active_count()-1
if current_count>=MAX_CLIENT:
    print(f"[정원초과]{addr}접속 거부({current_count}/{MAX_CLIENT})")
    conn.sendall("서버가 정원 초과입니다.".encode('utf-8'))
    conn.close()
    continue
'''

 

접속 초과 대기

import socket
from datetime import datetime
import threading

pool=threading.Semaphore(5)
def handle_client(conn, addr):
    with pool:
        print(f"{addr}접속 사용 중 입장권 : {5-pool._value}")
        with conn:
            try:
                while True:
                    data = conn.recv(1024)
                    if not data: break
                    request = data.decode('utf-8').strip()
                    print(f"{addr}요청 : {request}")

                    if "시간" in request:
                        now=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        response=f"현재 시간{now}"
                    elif "안녕" in request:
                        response="안녕하세요"
                    else:
                        response="이해 불가 명령어 입니다."
                    conn.sendall(response.encode('utf-8'))
            except ConnectionResetError:
                pass
            # print(f"{addr}연결 끊김")
        print(f"[퇴장]{addr}연결 끊김 : 입장권 회수")

HOST="192.168.0.204"
POST=50007

server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind((HOST,POST))
server.listen(5) # 대기열 5
print("서버 실행")
#MAX_CLIENT=3 # 스레드 수 제한방식
while True:
    conn, addr = server.accept()
    '''
    current_count=threading.active_count()-1
    if current_count>=MAX_CLIENT:
        print(f"[정원초과]{addr}접속 거부({current_count}/{MAX_CLIENT})")
        conn.sendall("서버가 정원 초과입니다.".encode('utf-8'))
        conn.close()
        continue
    '''
    client_thread = threading.Thread(target=handle_client, args=(conn, addr))
    client_thread.start()
    print(f"현 접속 인원{threading.active_count()-1}")

 

스레드 수 체크 active_count : 실제 자리(줭원)체크에서 스레드 자체를 만들지 않음
접속 실패 

세마포어(대기 공간)
서버 부담 active count 방식보다는 더 있다.
연결은 되어있는데 응답이 없는 (에코 작업처리가 안되는)


PJ

 

DB x UI x

스레드 관리

세마포어

 

채팅: 터미널 전체

아이디(닉네임)

귓속말

 

투표

알람

운세 1회(토큰 1개, random10)

송금

 

1. 서버 1개로 구성

2. 서버 접속시 클라이언트에게 아이디 요구

3. 서버에 임시 딕셔너리

4. 사용자가 투표 기능 입력시 전체 채팅 금지(sendall)

5. 클라이언트 알림 "00님이 입장하셨(나갔)습니다"

6. 들어온 사람 목록

7. 메뉴얼(입장 시 기본 키)

 

서버 => 클라이언트(닉네임 중복 체크)

사용 메뉴얼, 접속 닉네임 목록, 사용할 닉네임

(메뉴얼: 귓속말, 투표, 알람, 운세(1), 송금(기본 10000원))

 

클라이언트 => 서버

닉네임 입력


PJ

 

DB x UI x

스레드 관리

세마포어

 

마피아게임

기본 8명

초과시 입장 불가

 

마피아2

경찰1

의사1

시민4(기자1 포함)

 

대기방: 클라이언트 => 서버

참여시 기본 메뉴얼 제공

스레드 생성

(if 인원 다 차면 30초 뒤에 시작한다고 서버에서 알림)

데이터베이스 훼손 안되게

(처음부터 닉네임 제공)

 

닉네임 입력(인원 8명 차면 이후부터 거절) ->

메뉴얼 ->

대기방(카운트) ->

직업 배정(랜덤) ->

반복(

    단체 채팅1 ->

    (밤전환)투표0.5(시간안에 못하면 패스) -> 

    변론(최다 투표자 말하기 + 죽일지 말지 Y/N) ->

    직업별 능력 사용0.5(기,마,경,의 순으로) ->

    투표 결과 발표0.5

) -> 

게임 종료

(if 마피아 전부 중도에 나가면 종료, 아니면 일단 진행 + 남은 인원의 절반 이상이 마피아면 마피아 승)