
在现代 Web 应用中,实时通信(如聊天、通知、数据推送)已成为刚需。WebSocket 协议因其全双工、低延迟的特性,成为实现实时功能的首选。然而,当应用部署在多个服务器实例(集群)上时,传统的单机 WebSocket 会话管理面临挑战:如何确保发送给特定用户的消息能准确送达其连接的服务器?如何在服务器扩容、缩容或故障时保证连接的稳定性和消息的可达性?

Nacos,作为阿里巴巴开源的服务发现、配置管理与服务管理平台,其强大的服务发现和元数据管理能力,为解决分布式 WebSocket 的难题提供了优雅的方案。本文将详细介绍如何使用 Spring Boot 搭建一个基于 Nacos 的分布式 WebSocket 服务器。
@ServerEndpoint)启动时,向 Nacos 注册一个服务(例如 websocket-service),并在服务的元数据 (Metadata) 中记录自身可处理的用户ID范围、连接数、或一个唯一的实例ID。当需要向用户 A 推送消息时,先查询 Nacos 服务列表,根据元数据定位到持有 A 连接的服务器实例地址。websocket-service 实例列表,结合自身的负载均衡策略(如轮询、一致性哈希)选择一个实例进行连接。消息路由则依赖于 Nacos 的服务发现 API,通过查询元数据找到目标连接所在的实例,再通过内部通信(如 HTTP、RPC、消息队列)将消息转发过去。com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discoverycom.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config (用于状态同步)http://localhost:8848/nacos (默认地址) 可访问。Spring Web, WebSocket 依赖。server:
port: 8081 # 每个实例端口应不同,或通过启动参数指定
spring:
application:
name: websocket-service # 服务名称,所有 WebSocket 实例使用同一名称
cloud:
nacos:
discovery:
server-addr: localhost:8848 # Nacos Server 地址
# 将服务实例的 IP 和端口注册为 WebSocket 连接地址
# 注意:如果服务器有公网IP和内网IP,可能需要指定
# ip: 192.168.1.100 # 指定注册的IP
# port: ${server.port} # 通常自动获取 server.port
# (可选) 在元数据中添加自定义信息,用于标识实例能力或范围
metadata:
instance-id: ${spring.application.name}-${server.port} # 唯一实例ID
# websocket-port: 8081 # 如果 WebSocket 端口与 HTTP 端口不同,可单独声明
# user-range-start: 1
# user-range-end: 10000
# (可选) 配置中心
config:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
file-extension: yaml
# data-id: websocket-state-config # 配置 Data ID
# group: DEFAULT_GROUP
# (重要) 配置 Tomcat (或其他容器) 的线程池,WebSocket 连接是长连接,需要足够线程
server:
tomcat:
threads:
max: 500 # 根据预期连接数调整使用 @ServerEndpoint 注解定义 WebSocket 端点。关键点: 我们需要一个中心化的 Session 管理器来跟踪所有连接。
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 服务端点
* 注意:@ServerEndpoint 不是 Spring Bean,不能直接注入 Spring Bean (如 Nacos ServiceManager)
* 需要通过静态方法或 ApplicationContext 获取。
*/
@ServerEndpoint("/ws/{userId}")
@Component // 为了让 Spring 扫描到,但实际由容器管理
public class WebSocketEndpoint {
// (方案一) 静态 Map 管理本实例的连接 (仅限单实例或配合外部存储)
private static final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
private static final AtomicInteger connectionCount = new AtomicInteger(0);
// (方案二) 推荐:使用外部存储 (如 Redis) 或结合 Nacos Config 管理全局连接状态
// 这里为了简化演示,先用静态Map,后续会说明如何升级。
@OnOpen
public void onOpen(@PathParam("userId") String userId, Session session) {
// 1. 将用户ID与Session关联
sessionMap.put(userId, session);
int count = connectionCount.incrementAndGet();
System.out.println("用户 " + userId + " 连接成功。当前实例连接数: " + count);
// 2. (关键) 通知 Nacos 本实例状态更新 (可选,可通过定时任务或事件驱动)
// 例如:更新实例元数据中的连接数
updateNacosMetadata(userId, count);
}
@OnMessage
public void onMessage(String message, @PathParam("userId") String userId, Session session) {
System.out.println("收到来自用户 " + userId + " 的消息: " + message);
// 处理消息逻辑,例如:解析为指令、广播等
try {
session.getBasicRemote().sendText("服务器已收到: " + message);
} catch (IOException e) {
e.printStackTrace();
}
}
@OnClose
public void onClose(@PathParam("userId") String userId, Session session) {
sessionMap.remove(userId);
int count = connectionCount.decrementAndGet();
System.out.println("用户 " + userId + " 连接关闭。当前实例连接数: " + count);
// (可选) 通知 Nacos 状态更新
updateNacosMetadata(null, count);
}
@OnError
public void onError(Session session, Throwable error) {
System.err.println("WebSocket 发生错误: " + error.getMessage());
error.printStackTrace();
}
/**
* 发送消息给指定用户 (在本实例内)
*/
public static void sendMessageToUser(String userId, String message) {
Session session = sessionMap.get(userId);
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
System.err.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
// 可能需要清理 sessionMap
}
} else {
System.out.println("用户 " + userId + " 不在本实例连接或连接已关闭。");
// (重要) 需要触发跨实例查找和转发逻辑
// 1. 查询 Nacos 获取所有 websocket-service 实例
// 2. 遍历实例元数据,查找哪个实例持有 userId 的连接
// 3. 调用该实例的 HTTP API 或通过 MQ 发送消息
// 这里简化处理:打印日志,实际应调用路由服务
System.out.println("尝试通过 Nacos 发现并路由消息到用户 " + userId + " 所在的实例...");
// TODO: 调用 NacosDiscoveryService 进行路由
}
}
/**
* 广播消息给本实例所有用户
*/
public static void broadcastMessage(String message) {
sessionMap.forEach((userId, session) -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
System.err.println("广播消息失败: " + e.getMessage());
}
}
});
}
/**
* (可选) 更新 Nacos 实例元数据,例如连接数
* 需要获取 Nacos NamingService 实例
*/
private void updateNacosMetadata(String userId, int currentCount) {
// TODO: 通过 Nacos SDK (NamingService) 调用 updateInstance 更新本实例元数据
// 例如: 将 "connections" 元数据更新为 currentCount
// 这需要在 Spring 容器中获取 NamingService Bean
// NacosDiscoveryService nacosDiscoveryService = ...;
// nacosDiscoveryService.updateInstanceMetadata(currentCount);
}
}创建一个 Spring Service 来封装 Nacos 服务发现和消息路由逻辑。
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class NacosWebSocketRoutingService {
@Autowired
private NamingService namingService; // 由 Nacos Discovery Starter 自动注入
@Autowired
private RestTemplate restTemplate; // 用于调用其他实例的 HTTP API
private static final String WEBSOCKET_SERVICE_NAME = "websocket-service";
private static final String MESSAGE_ROUTE_PATH = "/api/route/message"; // 其他实例提供的消息转发API
/**
* 根据用户ID,查找其连接所在的服务器实例
* @param userId 用户ID
* @return 目标实例的地址 (host:port),或 null
*/
public String findInstanceByUserId(String userId) {
try {
// 1. 获取 websocket-service 的所有健康实例
ServiceInfo serviceInfo = namingService.getService(WEBSOCKET_SERVICE_NAME);
List<Instance> instances = serviceInfo.getHosts().stream()
.filter(Instance::isHealthy) // 只考虑健康实例
.collect(Collectors.toList());
// 2. (简化) 遍历实例,检查其元数据是否包含该用户ID
// 更优方案:使用一致性哈希、用户ID分片等策略,元数据中存储用户ID范围
for (Instance instance : instances) {
Map<String, String> metadata = instance.getMetadata();
// 示例:元数据中存储了 "users=user1,user2,user3" 或 "user-range=1-1000"
String usersStr = metadata.get("users");
if (usersStr != null && usersStr.contains(userId)) {
return instance.getIp() + ":" + instance.getPort();
}
// 或者检查用户ID是否在 range 内
// String range = metadata.get("user-range");
// if (isUserIdInRange(userId, range)) { ... }
}
// (更现实) 如果元数据没有直接存储用户列表,可能需要一个全局的用户-实例映射表
// 该表可存储在 Redis 或 Nacos Config 中,由各实例维护和监听。
} catch (NacosException e) {
e.printStackTrace();
}
return null;
}
/**
* 向指定实例发送消息 (通过 HTTP API)
* @param targetInstanceAddress 目标实例地址 (host:port)
* @param userId 目标用户ID
* @param message 消息内容
*/
public void sendMessageToInstance(String targetInstanceAddress, String userId, String message) {
String url = "http://" + targetInstanceAddress + MESSAGE_ROUTE_PATH;
RouteMessageRequest request = new RouteMessageRequest(userId, message);
try {
// 调用目标实例的 API
restTemplate.postForObject(url, request, String.class);
} catch (Exception e) {
System.err.println("向实例 " + targetInstanceAddress + " 转发消息失败: " + e.getMessage());
}
}
/**
* 主路由方法:向用户发送消息
* @param userId 用户ID
* @param message 消息内容
*/
public void routeMessageToUser(String userId, String message) {
// 1. 首先检查本实例是否持有连接 (优化:减少网络调用)
if (WebSocketEndpoint.isLocalUserConnected(userId)) {
WebSocketEndpoint.sendMessageToUser(userId, message);
return;
}
// 2. 查询 Nacos 发现目标实例
String targetInstance = findInstanceByUserId(userId);
if (targetInstance != null) {
// 3. 转发消息到目标实例
sendMessageToInstance(targetInstance, userId, message);
} else {
System.out.println("用户 " + userId + " 未找到连接实例,可能已离线。");
// 处理离线消息或通知发送方
}
}
// (可选) 定期或事件驱动更新本实例在 Nacos 的元数据
public void updateLocalInstanceMetadata(Map<String, String> metadata) {
try {
namingService.updateInstance(WEBSOCKET_SERVICE_NAME,
namingService.getGroupName(), // 或从配置获取
"your-instance-ip", // 需要获取本机IP
namingService.getPort(), // 获取本机端口
metadata); // 合并或覆盖元数据
} catch (NacosException e) {
e.printStackTrace();
}
}
}
// 路由消息请求 DTO
class RouteMessageRequest {
private String userId;
private String message;
// constructors, getters, setters
public RouteMessageRequest(String userId, String message) {
this.userId = userId;
this.message = message;
}
// ... getters and setters
}为了让其他实例能将消息转发给本实例的特定用户,需要暴露一个 HTTP API。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageRouteController {
@Autowired
private NacosWebSocketRoutingService routingService;
/**
* 接收来自其他 WebSocket 实例的路由消息
*/
@PostMapping("/api/route/message")
public String routeMessage(@RequestBody RouteMessageRequest request) {
// 在本实例内查找并发送消息
WebSocketEndpoint.sendMessageToUser(request.getUserId(), request.getMessage());
return "OK";
}
}在业务逻辑中(如另一个 Controller),需要向用户推送消息时:
@RestController
public class NotificationController {
@Autowired
private NacosWebSocketRoutingService routingService;
@PostMapping("/notify")
public String notifyUser(@RequestParam String userId, @RequestParam String message) {
// 使用路由服务发送消息,自动处理跨实例问题
routingService.routeMessageToUser(userId, message);
return "通知已发送";
}
}@ServerEndpoint 与 Spring Bean: @ServerEndpoint 由 WebSocket 容器管理,不是 Spring Bean。无法直接 @Autowired。解决方案:@PostConstruct 方法中将 ApplicationContext 赋值给静态变量,然后在 WebSocketEndpoint 中通过 ApplicationContext 获取所需的 Service Bean。ServerEndpointConfig.Configurator: 自定义 Configurator,在 getEndpointInstance 方法中通过 Spring 容器创建 WebSocketEndpoint 实例。userId -> instanceId 映射表。WebSocketEndpoint 在 @OnOpen/@OnClose 时更新此表,并监听表的变化(通过 Redis Pub/Sub 或 Nacos Config Listener)来处理用户迁移或实例故障。@OnPing, @OnPong),并配合 Nacos 的健康检查,确保能及时感知连接中断。通过 Spring Boot 整合 Nacos,我们构建了一个具备服务发现能力的分布式 WebSocket 服务器。Nacos 解决了连接发现和实例管理的核心问题。虽然 @ServerEndpoint 与 Spring 的集成需要特殊处理,但通过静态方法、ApplicationContext 或自定义 Configurator 可以克服。为了实现高效的全局状态管理和消息路由,强烈建议引入 Redis 或消息队列作为辅助组件。
这种架构使得 WebSocket 服务具备了良好的可扩展性、高可用性和弹性,能够轻松应对用户量的增长和服务器的动态变化,是构建大型实时应用的理想选择。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。