首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >用 Python 来实现客户端或服务器(TCP/IP 协议)

用 Python 来实现客户端或服务器(TCP/IP 协议)

原创
作者头像
时代疯
发布2025-01-22 16:32:20
发布2025-01-22 16:32:20
56400
代码可运行
举报
运行总次数:0
代码可运行

用 Python 来实现创建一个简单的客户端或服务器,可以使用 Python 的内置库 socket。这个库提供了对底层 TCP/IP 协议的访问,可以用来编写网络程序。

以下是一些简单的示例代码

示例 1:创建一个简单的 TCP 服务器

以下代码创建了一个简单的 TCP 服务器,会监听指定的端口,并接受客户端的连接。

代码语言:python
代码运行次数:0
运行
复制
import socket

def tcp_server():
    # 创建一个 TCP 套接字
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 绑定地址和端口
    host = "127.0.0.1"  # 本地主机地址
    port = 12345        # 监听端口
    server_socket.bind((host, port))
    
    # 开始监听
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")
    
    try:
        while True:
            # 接受客户端连接
            client_socket, client_address = server_socket.accept()
            print(f"Connection from {client_address}")
            
            # 接收客户端发送的数据
            data = client_socket.recv(1024)  # 缓冲区大小为 1024 字节
            print(f"Received data: {data.decode()}")
            
            # 向客户端发送响应
            client_socket.send("Hello from server!".encode())
            
            # 关闭客户端连接
            client_socket.close()
    except KeyboardInterrupt:
        print("Server is shutting down.")
    finally:
        server_socket.close()

if __name__ == "__main__":
    tcp_server()

示例 2:创建一个简单的 TCP 客户端

以下代码创建了一个简单的 TCP 客户端,它会连接到服务器并发送数据。

代码语言:python
代码运行次数:0
运行
复制
import socket

def tcp_client():
    # 创建一个 TCP 套接字
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 服务器地址和端口
    host = "127.0.0.1"
    port = 12345
    
    # 连接到服务器
    client_socket.connect((host, port))
    print(f"Connected to {host}:{port}")
    
    # 向服务器发送数据
    message = "Hello from client!"
    client_socket.send(message.encode())
    
    # 接收服务器响应
    response = client_socket.recv(1024)
    print(f"Received response: {response.decode()}")
    
    # 关闭连接
    client_socket.close()

if __name__ == "__main__":
    tcp_client()

说明

服务器端

  • 使用 socket.socket(socket.AF_INET, socket.SOCK_STREAM) 创建一个 TCP 套接字。
  • 使用 bind() 方法绑定到本地地址和端口。
  • 使用 listen() 方法开始监听连接。
  • 使用 accept() 方法接受客户端连接。
  • 使用 recv() 方法接收数据,使用 send() 方法发送数据。

客户端

  • 使用 socket.socket(socket.AF_INET, socket.SOCK_STREAM) 创建一个 TCP 套接字。
  • 使用 connect() 方法连接到服务器。
  • 使用 send() 方法发送数据,使用 recv() 方法接收服务器响应。

运行方式

  • 先运行服务器代码,然后运行客户端代码。
  • 客户端会连接到服务器,发送消息,并接收服务器的响应。

socket库的用途

1. UDP 通信

除了 TCP,socket 库也支持 UDP(用户数据报协议)。UDP 是一种无连接的协议,适合对实时性要求较高的应用(如视频流、游戏等),但不保证数据的可靠传输。

示例:UDP 服务器和客户端

UDP 服务器:

代码语言:python
代码运行次数:0
运行
复制
import socket

def udp_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = "127.0.0.1"
    port = 12345
    server_socket.bind((host, port))
    print(f"UDP Server listening on {host}:{port}")

    while True:
        data, client_address = server_socket.recvfrom(1024)
        print(f"Received data from {client_address}: {data.decode()}")
        server_socket.sendto("Hello from UDP server!".encode(), client_address)

if __name__ == "__main__":
    udp_server()

UDP 客户端

代码语言:python
代码运行次数:0
运行
复制
import socket

def udp_client():
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = "127.0.0.1"
    port = 12345

    message = "Hello from UDP client!"
    client_socket.sendto(message.encode(), (host, port))

    response, server_address = client_socket.recvfrom(1024)
    print(f"Received response from {server_address}: {response.decode()}")

    client_socket.close()

if __name__ == "__main__":
    udp_client()

2. 多线程或多进程网络服务

socket 库可以与 Python 的 threadingmultiprocessing 模块结合,实现多线程或多进程的网络服务,从而提高服务器的并发处理能力。

示例:多线程 TCP 服务器
代码语言:python
代码运行次数:0
运行
复制
import socket
import threading

def handle_client(client_socket, client_address):
    print(f"Connection from {client_address}")
    while True:
        data = client_socket.recv(1024)
        if not data:
            break
        print(f"Received data: {data.decode()}")
        client_socket.send(data)
    client_socket.close()
    print(f"Connection closed with {client_address}")

def tcp_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    host = "127.0.0.1"
    port = 12345
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    while True:
        client_socket, client_address = server_socket.accept()
        client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
        client_thread.start()

if __name__ == "__main__":
    tcp_server()

3. 自定义协议开发

socket 库允许你直接操作网络数据包,因此可以用于开发自定义协议。你可以定义自己的数据包格式,并通过 socket 进行数据的收发和解析。

示例:自定义协议
代码语言:python
代码运行次数:0
运行
复制
import socket

def custom_protocol_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    host = "127.0.0.1"
    port = 12345
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Custom Protocol Server listening on {host}:{port}")

    while True:
        client_socket, client_address = server_socket.accept()
        print(f"Connection from {client_address}")

        # 接收数据并解析自定义协议
        data = client_socket.recv(1024)
        header = data[:4]  # 假设前4字节是自定义协议头
        payload = data[4:]  # 剩余部分是数据负载
        print(f"Header: {header}, Payload: {payload.decode()}")

        # 发送响应
        response = b"ACK" + payload
        client_socket.send(response)
        client_socket.close()

if __name__ == "__main__":
    custom_protocol_server()

4. 网络扫描和调试

socket 库可以用于网络扫描、端口检测等调试任务。例如,你可以编写一个简单的端口扫描器,检测某个主机上哪些端口是开放的。

示例:简单端口扫描器
代码语言:python
代码运行次数:0
运行
复制
import socket

def port_scan(host, start_port, end_port):
    open_ports = []
    for port in range(start_port, end_port + 1):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)  # 设置超时时间
        result = sock.connect_ex((host, port))
        if result == 0:
            open_ports.append(port)
        sock.close()
    return open_ports

if __name__ == "__main__":
    host = "127.0.0.1"
    start_port = 1
    end_port = 1024
    open_ports = port_scan(host, start_port, end_port)
    print(f"Open ports on {host}: {open_ports}")

5. 基于套接字的中间件或代理

你可以使用 socket 库实现简单的中间件或代理服务器,例如一个 HTTP 代理或 TCP 中转服务。

示例:简单的 TCP 中转代理
代码语言:python
代码运行次数:0
运行
复制
import socket
import threading

def forward_data(src_socket, dst_socket):
    while True:
        data = src_socket.recv(1024)
        if not data:
            break
        dst_socket.send(data)
    src_socket.close()
    dst_socket.close()

def tcp_proxy():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("127.0.0.1", 8080))
    server_socket.listen(5)
    print("TCP Proxy listening on 127.0.0.1:8080")

    while True:
        client_socket, client_address = server_socket.accept()
        print(f"Connection from {client_address}")

        # 连接到目标服务器
        target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        target_socket.connect(("127.0.0.1", 8000))

        # 创建两个线程进行双向数据转发
        client_to_target = threading.Thread(target=forward_data, args=(client_socket, target_socket))
        target_to_client = threading.Thread(target=forward_data, args=(target_socket, client_socket))
        client_to_target.start()
        target_to_client.start()

if __name__ == "__main__":
    tcp_proxy()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 示例 1:创建一个简单的 TCP 服务器
  • 示例 2:创建一个简单的 TCP 客户端
  • 说明
  • socket库的用途
  • 1. UDP 通信
    • 示例:UDP 服务器和客户端
  • 2. 多线程或多进程网络服务
    • 示例:多线程 TCP 服务器
  • 3. 自定义协议开发
    • 示例:自定义协议
  • 4. 网络扫描和调试
    • 示例:简单端口扫描器
  • 5. 基于套接字的中间件或代理
    • 示例:简单的 TCP 中转代理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档