在 Python 中,有多种库可以用于创建服务器和客户端,以便在本地端口上处理数据传输。以下是一些常用的库和示例代码:
socket
库socket
库是 Python 标准库的一部分,用于低级别的网络通信。它可以用于创建 TCP 和 UDP 服务器和客户端。
服务器代码:
import socket
def start_server(host='127.0.0.1', port=65432):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
print(f'Server listening on {host}:{port}')
conn, addr = s.accept()
with conn:
print(f'Connected by {addr}')
while True:
data = conn.recv(1024)
if not data:
break
print(f'Received: {data.decode()}')
conn.sendall(data)
if __name__ == "__main__":
start_server()
客户端代码:
import socket
def start_client(host='127.0.0.1', port=65432):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(b'Hello, server')
data = s.recv(1024)
print(f'Received: {data.decode()}')
if __name__ == "__main__":
start_client()
asyncio
库asyncio
库用于编写异步网络代码。它可以用于创建高效的异步服务器和客户端。
服务器代码:
import asyncio
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f'Connected by {addr}')
while True:
data = await reader.read(100)
if not data:
break
print(f'Received: {data.decode()}')
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
async def start_server(host='127.0.0.1', port=65432):
server = await asyncio.start_server(handle_client, host, port)
addr = server.sockets[0].getsockname()
print(f'Server listening on {addr}')
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(start_server())
客户端代码:
import asyncio
async def start_client(host='127.0.0.1', port=65432):
reader, writer = await asyncio.open_connection(host, port)
writer.write(b'Hello, server')
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()}')
writer.close()
await writer.wait_closed()
if __name__ == "__main__":
asyncio.run(start_client())
http.server
和 requests
库如果您需要处理 HTTP 请求,可以使用 http.server
库创建一个简单的 HTTP 服务器,并使用 requests
库创建客户端。
服务器代码:
from http.server import BaseHTTPRequestHandler, HTTPServer
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Hello, client')
def start_server(host='127.0.0.1', port=8000):
server = HTTPServer((host, port), SimpleHTTPRequestHandler)
print(f'Server listening on {host}:{port}')
server.serve_forever()
if __name__ == "__main__":
start_server()
客户端代码:
import requests
def start_client(url='http://127.0.0.1:8000'):
response = requests.get(url)
print(f'Received: {response.text}')
if __name__ == "__main__":
start_client()
小程序·云开发官方直播课(数据库方向)
云原生正发声
云+社区开发者大会(杭州站)
云+社区技术沙龙[第17期]
小程序·云开发官方直播课(数据库方向)
T-Day
云+社区沙龙online [国产数据库]
云+未来峰会
云+社区技术沙龙[第8期]
云+社区技术沙龙[第22期]
云+社区技术沙龙[第12期]
领取专属 10元无门槛券
手把手带您无忧上云