在网络编程的世界里,TCP 和 UDP 是传输层两大核心协议。TCP 以其可靠性(重传、排序、拥塞控制)著称,而 UDP 则以其简单、低延迟和无状态闻名。然而,在某些场景下,我们既需要 UDP 的低延迟和轻量级,又无法完全放弃对可靠性的要求,例如在线游戏、实时音视频通信、高速金融交易等。
这时,一个自然的想法就是在应用层,基于 UDP 之上,实现一套自己的可靠性机制。这就是所谓的“可靠 UDP”(Reliable UDP)。本文将深入探讨如何利用 Go 语言强大的并发特性,优雅地实现一个基础版本的可靠 UDP 协议。
在我们开始造轮子之前,必须先问:为什么不用现成的 TCP?
当然,选择可靠 UDP 也意味着你需要自己处理所有 TCP 已经帮你做好的事情,这是一个典型的权衡。
一个最基础的可靠 UDP 协议通常需要实现以下几个机制:
我们的 Go 语言实现将专注于前 4 个机制。
Go 语言的并发模型(Goroutines 和 Channels)非常适合编写高性能、高并发的网络程序。我们将利用这些特性来构建我们的可靠 UDP 服务器和客户端。
首先,我们需要定义应用层的数据包格式,它在原始的 UDP 负载之前添加我们的协议头。
package rudp
import (
"encoding/binary"
"errors"
)
// PacketType 代表数据包的类型
type PacketType byte
const (
// DataPacket 是携带应用数据的包
DataPacket PacketType = iota
// AckPacket 是确认包
AckPacket
)
// PacketHeader 是我们可靠UDP协议的头
type PacketHeader struct {
SeqNum uint32 // 序列号
Type PacketType // 包类型 (Data or Ack)
}
// Packet 代表一个完整的应用层数据包
type Packet struct {
Header PacketHeader
Data []byte
}
// Marshal 将Packet序列化为字节流,以便通过UDP发送
func (p *Packet) Marshal() []byte {
buf := make([]byte, 5+len(p.Data)) // 4(SeqNum) + 1(Type) + len(Data)
binary.BigEndian.PutUint32(buf[0:4], p.Header.SeqNum)
buf[4] = byte(p.Header.Type)
copy(buf[5:], p.Data)
return buf
}
// Unmarshal 从接收到的UDP负载中解析出Packet
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < 5 {
return nil, errors.New("packet too small")
}
p := &Packet{}
p.Header.SeqNum = binary.BigEndian.Uint32(data[0:4])
p.Header.Type = PacketType(data[4])
p.Data = data[5:]
return p, nil
}
发送端需要维护一个发送窗口,记录已发送但未确认的包,并管理重传定时器。
package rudp
import (
"log"
"net"
"sync"
"time"
)
const maxSeqNum = 1024 // 序列号范围,简单的环形序列号
const rto = 200 * time.Millisecond // 初始重传超时时间
// SendSession 代表一个可靠的UDP发送会话
type SendSession struct {
conn *net.UDPConn
remoteAddr *net.UDPAddr
nextSeqNum uint32
mu sync.Mutex
pendingAcks map[uint32]*pendingPacket // 等待ACK的包映射
closed bool
}
type pendingPacket struct {
sendTime time.Time
data []byte
}
// NewSendSession 创建一个新的发送会话
func NewSendSession(conn *net.UDPConn, raddr *net.UDPAddr) *SendSession {
s := &SendSession{
conn: conn,
remoteAddr: raddr,
pendingAcks: make(map[uint32]*pendingPacket),
}
go s.ackReceiver() // 启动一个goroutine来接收ACK
go s.retransmitter() // 启动一个goroutine来处理重传
return s
}
// Send 可靠地发送数据
func (s *SendSession) Send(data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return errors.New("session closed")
}
seqNum := s.nextSeqNum
s.nextSeqNum = (s.nextSeqNum + 1) % maxSeqNum
// 创建数据包
pkt := &Packet{
Header: PacketHeader{SeqNum: seqNum, Type: DataPacket},
Data: data,
}
packedData := pkt.Marshal()
// 发送
_, err := s.conn.WriteToUDP(packedData, s.remoteAddr)
if err != nil {
return err
}
// 记录到等待ACK的map中
s.pendingAcks[seqNum] = &pendingPacket{
sendTime: time.Now(),
data: packedData,
}
log.Printf("Sent packet with seq: %d", seqNum)
return nil
}
// ackReceiver 接收ACK包并从pendingAcks中移除对应的包
func (s *SendSession) ackReceiver() {
buf := make([]byte, 1500) // MTU大小
for {
n, addr, err := s.conn.ReadFromUDP(buf)
if err != nil {
if s.closed {
return
}
log.Printf("Read error: %v", err)
continue
}
// 只处理目标地址的ACK包
if addr.String() != s.remoteAddr.String() {
continue
}
pkt, err := Unmarshal(buf[:n])
if err != nil {
log.Printf("Unmarshal error: %v", err)
continue
}
if pkt.Header.Type == AckPacket {
s.mu.Lock()
seq := pkt.Header.SeqNum
if _, exists := s.pendingAcks[seq]; exists {
delete(s.pendingAcks, seq)
log.Printf("Received ACK for seq: %d", seq)
} else {
log.Printf("Received ACK for unknown seq: %d", seq)
}
s.mu.Unlock()
}
}
}
// retransmitter 定期检查并重传超时的包
func (s *SendSession) retransmitter() {
ticker := time.NewTicker(rto / 2) // 检查频率是RTO的一半
defer ticker.Stop()
for range ticker.C {
if s.closed {
return
}
s.mu.Lock()
now := time.Now()
for seq, pp := range s.pendingAcks {
if now.Sub(pp.sendTime) > rto {
log.Printf("Retransmitting packet seq: %d", seq)
s.conn.WriteToUDP(pp.data, s.remoteAddr)
pp.sendTime = now // 更新发送时间
}
}
s.mu.Unlock()
}
}
// Close 关闭会话
func (s *SendSession) Close() {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
s.conn.Close()
}
接收端负责接收数据包,按序列号排序(如果需要),并向发送端发送 ACK。
// RecvSession 代表一个可靠的UDP接收会话
type RecvSession struct {
conn *net.UDPConn
expectedSeq uint32
mu sync.Mutex
}
// NewRecvSession 创建一个新的接收会话
func NewRecvSession(conn *net.UDPConn) *RecvSession {
return &RecvSession{conn: conn}
}
// ListenAndServe 开始监听并处理到达的数据包
func (s *RecvSession) ListenAndServe(handler func([]byte)) error {
buf := make([]byte, 1500)
for {
n, addr, err := s.conn.ReadFromUDP(buf)
if err != nil {
return err
}
pkt, err := Unmarshal(buf[:n])
if err != nil {
log.Printf("Unmarshal error: %v", err)
continue
}
if pkt.Header.Type == DataPacket {
go s.handleDataPacket(pkt, addr, handler)
}
}
}
func (s *RecvSession) handleDataPacket(pkt *Packet, addr *net.UDPAddr, handler func([]byte)) {
s.mu.Lock()
defer s.mu.Unlock()
seq := pkt.Header.SeqNum
log.Printf("Received data packet seq: %d, expected: %d", seq, s.expectedSeq)
// 发送ACK,无论序列号是否预期,都ACK收到的这个seq
ackPkt := &Packet{
Header: PacketHeader{SeqNum: seq, Type: AckPacket},
Data: nil,
}
ackData := ackPkt.Marshal()
s.conn.WriteToUDP(ackData, addr)
log.Printf("Sent ACK for seq: %d", seq)
// 简单的按序交付:只有当序列号是预期的才交付给应用层
// 对于非预期的包(延迟、重复),我们ACK它但暂时不交付
// 更复杂的实现可以使用一个缓存队列来重新排序
if seq == s.expectedSeq {
handler(pkt.Data) // 将数据交付给应用层处理
s.expectedSeq = (s.expectedSeq + 1) % maxSeqNum
} else {
// 可以在这里记录日志或实现NACK逻辑
log.Printf("Out-of-order packet. Expected %d, got %d", s.expectedSeq, seq)
}
}
package main
import (
"fmt"
"log"
"net"
"yourmodulepath/rudp" // 替换为你的模块路径
)
func main() {
addr, err := net.ResolveUDPAddr("udp", ":10000")
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Println("Server listening on :10000")
session := rudp.NewRecvSession(conn)
// 定义应用层数据处理函数
handler := func(data []byte) {
fmt.Printf("Received reliable data: %s\n", string(data))
}
err = session.ListenAndServe(handler)
if err != nil {
log.Fatal(err)
}
}
package main
import (
"log"
"net"
"time"
"yourmodulepath/rudp" // 替换为你的模块路径
)
func main() {
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:10000")
if err != nil {
log.Fatal(err)
}
// 使用一个本地UDP连接
conn, err := net.ListenUDP("udp", nil) // 随机本地端口
if err != nil {
log.Fatal(err)
}
defer conn.Close()
session := rudp.NewSendSession(conn, serverAddr)
defer session.Close()
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("Hello Reliable UDP #%d", i)
err := session.Send([]byte(msg))
if err != nil {
log.Printf("Send failed: %v", err)
}
time.Sleep(500 * time.Millisecond)
}
// 等待所有ACK或超时
time.Sleep(5 * time.Second)
}
这个实现是一个极其基础的演示版本,离生产级别还相差甚远。在实际应用中,你需要考虑以下方面:
通过 Go 语言,我们利用其简洁的语法和强大的并发原语,相对轻松地实现了一个具备基本可靠性(确认、序列号、重传)的 UDP 协议。这很好地展示了在应用层实现定制化传输协议的可行性。
虽然这个实现很简单,但它提供了一个完美的起点和思想实验。你可以在此基础上,根据特定应用场景的需求,逐步添加更高级的功能,打造一个真正高性能、高可靠、量身定制的传输方案。
记住,造轮子不是为了替代 TCP,而是在 TCP 不符合要求时,提供另一种可能。