在通信的大舞台上,UDP是一位默默贡献的明星。而当它与Spring Boot和Netty联手,再搭配Go语言的模拟设备,将掀起异步通信的新篇章。今天,我们将一同踏入这个奇妙的领域,揭开Spring Boot和Netty在UDP通信中的神秘面纱。
异步通信具有许多优势,特别是在处理大量连接、高并发和I/O密集型操作时。
将Spring Boot与Netty结合是为了利用Netty的高性能网络通信能力,而Spring Boot则提供了便捷的开发和集成环境。下面是详细介绍如何搭建一个高效的UDP服务端,使用Spring Boot和Netty实现。
首先,在Spring Boot项目的pom.xml
中添加Netty的依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version> <!-- 替换为最新版本 -->
</dependency>
创建一个UDP服务端,使用Netty实现。下面是一个简单的示例:
package com.todoitbo.baseSpringbootDasmart.netty.server;
import com.todoitbo.baseSpringbootDasmart.netty.handler.UdpHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author todoitbo
* @date 2023/11/29
*/
@Slf4j
public class NettyUdpServer {
private final int nettyPort;
public static Channel channel;
public NettyUdpServer(int port) {
this.nettyPort = port;
}
/**
* 启动服务
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
// 连接管理线程池
EventLoopGroup mainGroup = new NioEventLoopGroup(2);
EventLoopGroup workGroup = new NioEventLoopGroup(8);
try {
// 工作线程池
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(mainGroup)
// 指定 nio 通道,支持 UDP
.channel(NioDatagramChannel.class)
// 广播模式
.option(ChannelOption.SO_BROADCAST, true)
// 设置读取缓冲区大小为 10M
.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
// 设置发送缓冲区大小为 10M
.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
// 线程池复用缓冲区
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 指定 socket 地址和端口
.localAddress(new InetSocketAddress(nettyPort))
// 添加通道 handler
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline()
// 指定工作线程,提高并发性能
.addLast(workGroup,new UdpHandler());
}
});
// 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture sync = bootstrap.bind().sync();
channel = sync.channel();
log.info("---------- [init] UDP netty server start ----------");
// 阻塞等待服务器关闭
channel.closeFuture().sync();
} finally {
// 释放资源
mainGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
创建一个简单的UDP消息处理器,用于处理接收到的消息,且使用CompletableFuture来实现异步收发
package com.todoitbo.baseSpringbootDasmart.netty.handler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static com.todoitbo.baseSpringbootDasmart.controller.SysUploadController.socketAddressMap;
/**
* @author todoitbo
* @date 2023/11/29
*/
public class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {
// 使用 CompletableFuture 用于异步获取客户端的响应
public static CompletableFuture<String> responseFuture = new CompletableFuture<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
// 从DatagramPacket中获取数据和发送者信息
byte[] data;
int len = packet.content().readableBytes();
if (packet.content().hasArray()) {
data = packet.content().array();
} else {
data = new byte[len];
packet.content().getBytes(packet.content().readerIndex(), data);
}
String senderAddress = packet.sender().getAddress().getHostAddress();
int senderPort = packet.sender().getPort();
// 处理接收到的数据
String message = new String(data);
System.out.println("Received message from " + senderAddress + ":" + senderPort + " - " + message);
if (message.contains("test")) {
responseFuture.complete(message);
}
// 构建响应消息
String response = "Hello, client!";
byte[] responseData = response.getBytes();
// 创建响应的DatagramPacket并发送给发送者
InetSocketAddress senderSocketAddress = new InetSocketAddress(senderAddress, senderPort);
socketAddressMap.put("test", senderSocketAddress);
DatagramPacket responsePacket = new DatagramPacket(Unpooled.copiedBuffer(responseData), senderSocketAddress);
ctx.writeAndFlush(responsePacket);
}
/*@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 处理异常情况
cause.printStackTrace();
ctx.close();
}*/
// 在接口调用后等待客户端响应的方法
public static String waitForClientResponse() {
try {
// 使用 CompletableFuture 的 get 方法来阻塞等待客户端的响应
String s = responseFuture.get(500, TimeUnit.MILLISECONDS);
responseFuture = new CompletableFuture<>();
return s; // 等待时间为 1 秒
} catch (Exception e) {
// 发生超时或其他异常,可以根据实际情况处理
return "456"; // 超时返回默认值 "456"
}
}
}
⚠️:注意 在某些上下文中,将 CompletableFuture 声明为 public static 可行,但请注意这并不总是一个最佳实践。做出这个决定时需要考虑以下几点: 线程安全性 - CompletableFuture 是线程安全的,但是如果你在多个线程中设置其结果,你可能会遇到异常,因为 CompletableFuture 的结果只能被设置一次。 共享状态 - 任何可以访问这个 public static 变量的代码都可以改变其状态。这可能会导致你的代码难于理解和维护。 生命周期 - 这个 CompletableFuture 的生命周期与应用程序的生命周期一致,除非显式地设置为 null。 这可能在某些情况下会导致内存泄漏。 如果是为了协调或表示一个跨类或跨方法的异步操作的结果,使用 public static CompletableFuture 是可以接受的。但你需要意识到在静态上下文中共享的状态可能会导致的问题,并以适当的同步机制处理它们。
创建一个Spring Boot应用,并在应用启动时启动UDP服务端:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class BaseSpringbootDasmartApplication {
/* static {
AspectLogEnhance.enhance();}//进行日志增强,自动判断日志框架*/
public static void main(String[] args) {
// System.setProperty("log4j2.isThreadContextMapInheritable", Boolean.TRUE.toString());
SpringApplication.run(BaseSpringbootDasmartApplication.class, args);
try {
// new NettyWebsocketServer(13025).run();
new NettyUdpServer(13026).start();
} catch (Exception e) {
throw new BusinessException("-----启动失败-----", e.getMessage()).setCause(e).setLog();
}
}
}
@GetMapping("/login/{message}")
public String login(@PathVariable String message) throws NacosException, InterruptedException {
byte[] responseData = message.getBytes();
// 创建响应的DatagramPacket并发送给发送者
DatagramPacket responsePacket = new DatagramPacket(Unpooled.copiedBuffer(responseData), socketAddressMap.get("test"));
NettyUdpServer.channel.writeAndFlush(responsePacket);
// 客户端是否响应,响应返回传入值,否则返回456,响应时间不超过0.5s,如果10.5s还未响应,则返回456
return UdpHandler.waitForClientResponse();
}
下面是一个简单的Go语言程序,用于模拟UDP客户端,发送和接收指令。在这个例子中,我们使用Go的
net
包来处理UDP通信。下面的代码可以直接放到main中
// @Author todoitbo 2023/11/29 14:26:00
package utils
import (
"context"
"fmt"
"net"
"strings"
"sync"
)
// UDPClient 是一个简单的 UDP 客户端
type UDPClient struct {
conn *net.UDPConn
mu sync.Mutex
}
// NewUDPClient 创建一个新的 UDP 客户端
func NewUDPClient(serverAddr string) (*UDPClient, error) {
client := &UDPClient{}
addr, err := net.ResolveUDPAddr("udp", serverAddr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return nil, err
}
client.conn = conn
message := []byte("你好")
_, err = conn.Write(message)
return client, nil
}
// Close 关闭 UDP 客户端连接
func (c *UDPClient) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
c.conn.Close()
}
}
// ListenForMessages 启动 Goroutine 监听服务端的实时消息
func (c *UDPClient) ListenForMessages(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() // 在 Goroutine 结束时通知 WaitGroup
buffer := make([]byte, 1024)
for {
select {
case <-ctx.Done():
// 收到关闭信号,结束 Goroutine
return
default:
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
// 客户端连接已关闭
return
}
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading from server:", err)
return
}
// 处理收到的消息,可以根据实际需求进行逻辑处理
message := string(buffer[:n])
if strings.Contains(message, "test") {
c.SendMessage(message)
}
// hexString := hex.EncodeToString(message)
// 将 3600 转换为字符串
/*expectedValue := "3600"
if hexString == expectedValue {
c.SendMessage("test")
}else {
c.SendMessage(message)
}*/
fmt.Println("Received message from server: ", message)
}
}
}
// SendMessage 向服务端发送消息
func (c *UDPClient) SendMessage(message string) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return fmt.Errorf("client connection is closed")
}
_, err := c.conn.Write([]byte(message))
return err
}
func InitUDPClient(ctx context.Context, wg *sync.WaitGroup, serverAddr string) (*UDPClient, error) {
client, err := NewUDPClient(serverAddr)
if err != nil {
return nil, err
}
// 启动 Goroutine 监听服务端的实时消息
wg.Add(1)
go client.ListenForMessages(ctx, wg)
return client, nil
}
func init() {
InitUDPClient(context.Background(), &sync.WaitGroup{}, "127.0.0.1:13026")
}
⚠️:上面代码需要注意的地方
1️⃣:通用udp客户端建立
2️⃣:ListenForMessages 启动 Goroutine 监听服务端的实时消息
3️⃣:消息的处理,这里我使用的是字符串来接收,真正的设备应该是接收16进制的指令。
1️⃣:运行Spring Boot应用,UDP服务端将会在13026端口启动。你可以使用UDP客户端发送消息到该端口,然后在控制台看到服务端输出的消息。
2️⃣:运行go程序,可以在springboot控制台看到打印如下
3️⃣:调用接口,可以同时看到go程序,与springboot打印数据如下
这只是一个简单的示例,实际应用中可能需要根据具体需求进行更复杂的处理和逻辑。 Netty提供了强大的异步事件模型,适用于构建高性能、可伸缩的网络应用程序,而Spring Boot则为我们提供了更便捷的开发体验和集成环境。通过整合Spring Boot和Netty,你可以在网络通信方面获得更好的性能和灵活性。
性能优化和调优在高负载环境中是至关重要的,特别是在UDP通信这种无连接、不可靠的场景中。以下是一些性能优化的技巧和在高负载环境中调整UDP通信以获得最佳性能的建议:
在进行性能优化和调优时,需要根据具体的应用场景和性能测试结果进行调整。优化的效果可能因应用的特性而异,因此在实施之前最好进行充分的性能测试。
UDP通信的主要特性是无连接和不可靠,相对于TCP,它缺乏内建的安全性机制,因此在UDP通信中需要额外关注安全性问题。以下是一些UDP通信中的安全性问题和如何实现UDP通信的加密传输的建议:
实现UDP通信的加密传输需要综合考虑数据的机密性、完整性和身份验证等因素。选择合适的安全机制和协议取决于具体的应用场景和安全需求。
深深感谢你阅读完整篇文章,希望你从中获得了些许收获。如果觉得有价值,欢迎点赞、收藏,并关注我的更新,期待与你共同分享更多技术与思考。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有