-
기본 예시)
서버
import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 61001) s.bind(server_address) s.listen() client_s, client_addr = s.accept() for i in range(10): data = client_s.recv(1024) data = data.decode() print(data) client_s.close() print('종료')클라이언트
import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 61001) s.connect(server_address) while True: data = input('') if data == 'q': break data = data.encode() s.send(data) s.close()서로 채팅 주고받기 - 보내고 받는 스레드를 만들면 동시에 실행이 가능하다.
예시)
서버
import socket import threading def Send(sock): while True: rawdata = input('') data = rawdata.encode() sock.send(data) if rawdata == 'q': break sock.close() def Recv(sock): while True: data = sock.recv(1024) if not data: break data = data.decode() print("Client:", data) if(data == 'q'): break sock.close() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 61001) s.bind(server_address) s.listen() client_socket, client_addr = s.accept() recvThread = threading.Thread(target=Recv, args=(client_socket,)) sendThread = threading.Thread(target=Send, args=(client_socket,)) recvThread.start() sendThread.start() recvThread.join() sendThread.join()클라이언트
import socket import threading def Send(sock): while True: rawdata = input('') data = rawdata.encode() sock.send(data) if rawdata == 'q': break sock.close() def Recv(sock): while True: data = sock.recv(1024) if not data: break data = data.decode() print("Server:", data) if(data == 'q'): break sock.close() server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', 61001) server_socket.connect(server_address) recvThread = threading.Thread(target=Recv, args=(server_socket,)) sendThread = threading.Thread(target=Send, args=(server_socket,)) recvThread.start() sendThread.start() recvThread.join() sendThread.join()논 블로킹 소켓
def Recv(sock): sock.setblocking(False) while True: time.sleep(1) try: data = sock.recv(1024) if not data: break data = data.decode() print("Server:", data) if(data == 'q'): break except BlockingIOError: print("아무것도 일어나지 않았다") sock.close()논 블로킹 소켓을 사용하려면 socket.setblocking(False)를 하면 된다.
아무것도 받지 않으면 BlockingIOError를 일으키므로 try except가 필요하다.
참고: https://docs.python.org/ko/3.15/howto/sockets.html#non-blocking-sockets
'python' 카테고리의 다른 글
PyQt5 라이브러리 (0) 2025.12.08 asyncio 라이브러리 (0) 2025.08.23 selenium v4.0 (python) (0) 2023.04.10 파이썬 sorted 정렬 조건 정해주기 (0) 2022.07.08 파이썬 리스트 인덱스 여러개 찾기 (0) 2022.06.30