首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring Boot 整合 Nacos 实现分布式 WebSocket 服务

Spring Boot 整合 Nacos 实现分布式 WebSocket 服务

原创
作者头像
技术文章分析
修改2025-08-08 14:14:13
修改2025-08-08 14:14:13
50400
代码可运行
举报
文章被收录于专栏:技术技术
运行总次数:0
代码可运行

在现代 Web 应用中,实时通信(如聊天、通知、数据推送)已成为刚需。WebSocket 协议因其全双工、低延迟的特性,成为实现实时功能的首选。然而,当应用部署在多个服务器实例(集群)上时,传统的单机 WebSocket 会话管理面临挑战:如何确保发送给特定用户的消息能准确送达其连接的服务器?如何在服务器扩容、缩容或故障时保证连接的稳定性和消息的可达性?

Nacos,作为阿里巴巴开源的服务发现、配置管理与服务管理平台,其强大的服务发现和元数据管理能力,为解决分布式 WebSocket 的难题提供了优雅的方案。本文将详细介绍如何使用 Spring Boot 搭建一个基于 Nacos 的分布式 WebSocket 服务器。

核心挑战与 Nacos 的解决方案
  1. 连接发现 (Connection Discovery):
    • 问题: 用户 A 连接到 Server-1 的 WebSocket,用户 B 想给 A 发消息,但 B 的请求可能被负载均衡到 Server-2 或 Server-3。Server-2/3 如何知道用户 A 的连接在 Server-1 上?
    • Nacos 方案: 每个 WebSocket 服务器实例(@ServerEndpoint)启动时,向 Nacos 注册一个服务(例如 websocket-service),并在服务的元数据 (Metadata) 中记录自身可处理的用户ID范围、连接数、或一个唯一的实例ID。当需要向用户 A 推送消息时,先查询 Nacos 服务列表,根据元数据定位到持有 A 连接的服务器实例地址。
  2. 连接管理与状态同步 (Connection Management & State Sync):
    • 问题: 服务器实例重启或下线时,其上的 WebSocket 连接会断开。如何通知其他实例或客户端进行重连?如何保证连接状态(如用户在线状态)的最终一致性?
    • Nacos 方案: Nacos 的服务健康检查机制能自动感知实例的上下线。结合 Nacos 的配置中心功能,可以将关键的连接状态信息(如用户-实例映射表、在线状态)存储在 Nacos 配置中,由各实例监听变更,实现状态的分布式同步。
  3. 负载均衡与路由 (Load Balancing & Routing):
    • 问题: 客户端如何连接到合适的 WebSocket 服务器?消息如何路由到目标连接所在的服务器?
    • Nacos 方案: 客户端连接时,可以从 Nacos 获取所有可用的 websocket-service 实例列表,结合自身的负载均衡策略(如轮询、一致性哈希)选择一个实例进行连接。消息路由则依赖于 Nacos 的服务发现 API,通过查询元数据找到目标连接所在的实例,再通过内部通信(如 HTTP、RPC、消息队列)将消息转发过去。
技术栈
  • Spring Boot: 2.7.x / 3.x (推荐 3.x)
  • Spring Websocket: 提供 WebSocket 支持。
  • Nacos Server: 2.x 版本。
  • Nacos Discovery Spring Boot Starter: com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery
  • (可选) Nacos Config Spring Boot Starter: com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config (用于状态同步)
  • (可选) 消息中间件 (如 RabbitMQ, RocketMQ): 用于跨实例消息转发,解耦更彻底。
实现步骤
1. 环境准备
  • 启动 Nacos Server:Nacos GitHub Releases 下载并启动单机或集群模式的 Nacos 服务。确保 http://localhost:8848/nacos (默认地址) 可访问。
  • 创建 Spring Boot 项目: 使用 Spring Initializr 创建项目,添加 Spring Web, WebSocket 依赖。
  • 添加 Nacos 依赖 (pom.xml):xml深色版本<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2022.0.0.0</version> <!-- 注意版本兼容性,需匹配 Spring Boot 版本 --> </dependency> <!-- 如果需要配置中心同步状态 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>2022.0.0.0</version> </dependency>
2. 配置 Nacos (application.yml)
代码语言:javascript
代码运行次数:0
运行
复制
server:
  port: 8081 # 每个实例端口应不同,或通过启动参数指定

spring:
  application:
    name: websocket-service # 服务名称,所有 WebSocket 实例使用同一名称
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # Nacos Server 地址
        # 将服务实例的 IP 和端口注册为 WebSocket 连接地址
        # 注意:如果服务器有公网IP和内网IP,可能需要指定
        # ip: 192.168.1.100 # 指定注册的IP
        # port: ${server.port} # 通常自动获取 server.port
        # (可选) 在元数据中添加自定义信息,用于标识实例能力或范围
        metadata:
          instance-id: ${spring.application.name}-${server.port} # 唯一实例ID
          # websocket-port: 8081 # 如果 WebSocket 端口与 HTTP 端口不同,可单独声明
          # user-range-start: 1
          # user-range-end: 10000
      # (可选) 配置中心
      config:
        server-addr: ${spring.cloud.nacos.discovery.server-addr}
        file-extension: yaml
        # data-id: websocket-state-config # 配置 Data ID
        # group: DEFAULT_GROUP

# (重要) 配置 Tomcat (或其他容器) 的线程池,WebSocket 连接是长连接,需要足够线程
server:
  tomcat:
    threads:
      max: 500 # 根据预期连接数调整
3. 创建 WebSocket 服务端点 (ServerEndpoint)

使用 @ServerEndpoint 注解定义 WebSocket 端点。关键点: 我们需要一个中心化的 Session 管理器来跟踪所有连接。

代码语言:javascript
代码运行次数:0
运行
复制
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocket 服务端点
 * 注意:@ServerEndpoint 不是 Spring Bean,不能直接注入 Spring Bean (如 Nacos ServiceManager)
 * 需要通过静态方法或 ApplicationContext 获取。
 */
@ServerEndpoint("/ws/{userId}")
@Component // 为了让 Spring 扫描到,但实际由容器管理
public class WebSocketEndpoint {

    // (方案一) 静态 Map 管理本实例的连接 (仅限单实例或配合外部存储)
    private static final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
    private static final AtomicInteger connectionCount = new AtomicInteger(0);

    // (方案二) 推荐:使用外部存储 (如 Redis) 或结合 Nacos Config 管理全局连接状态
    // 这里为了简化演示,先用静态Map,后续会说明如何升级。

    @OnOpen
    public void onOpen(@PathParam("userId") String userId, Session session) {
        // 1. 将用户ID与Session关联
        sessionMap.put(userId, session);
        int count = connectionCount.incrementAndGet();
        System.out.println("用户 " + userId + " 连接成功。当前实例连接数: " + count);

        // 2. (关键) 通知 Nacos 本实例状态更新 (可选,可通过定时任务或事件驱动)
        // 例如:更新实例元数据中的连接数
        updateNacosMetadata(userId, count);
    }

    @OnMessage
    public void onMessage(String message, @PathParam("userId") String userId, Session session) {
        System.out.println("收到来自用户 " + userId + " 的消息: " + message);
        // 处理消息逻辑,例如:解析为指令、广播等
        try {
            session.getBasicRemote().sendText("服务器已收到: " + message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @OnClose
    public void onClose(@PathParam("userId") String userId, Session session) {
        sessionMap.remove(userId);
        int count = connectionCount.decrementAndGet();
        System.out.println("用户 " + userId + " 连接关闭。当前实例连接数: " + count);

        // (可选) 通知 Nacos 状态更新
        updateNacosMetadata(null, count);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        System.err.println("WebSocket 发生错误: " + error.getMessage());
        error.printStackTrace();
    }
    /**
     * 发送消息给指定用户 (在本实例内)
     */
    public static void sendMessageToUser(String userId, String message) {
        Session session = sessionMap.get(userId);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                System.err.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
                // 可能需要清理 sessionMap
            }
        } else {
            System.out.println("用户 " + userId + " 不在本实例连接或连接已关闭。");
            // (重要) 需要触发跨实例查找和转发逻辑
            // 1. 查询 Nacos 获取所有 websocket-service 实例
            // 2. 遍历实例元数据,查找哪个实例持有 userId 的连接
            // 3. 调用该实例的 HTTP API 或通过 MQ 发送消息
            // 这里简化处理:打印日志,实际应调用路由服务
            System.out.println("尝试通过 Nacos 发现并路由消息到用户 " + userId + " 所在的实例...");
            // TODO: 调用 NacosDiscoveryService 进行路由
        }
    }

    /**
     * 广播消息给本实例所有用户
     */
    public static void broadcastMessage(String message) {
        sessionMap.forEach((userId, session) -> {
            if (session.isOpen()) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    System.err.println("广播消息失败: " + e.getMessage());
                }
            }
        });
    }

    /**
     * (可选) 更新 Nacos 实例元数据,例如连接数
     * 需要获取 Nacos NamingService 实例
     */
    private void updateNacosMetadata(String userId, int currentCount) {
        // TODO: 通过 Nacos SDK (NamingService) 调用 updateInstance 更新本实例元数据
        // 例如: 将 "connections" 元数据更新为 currentCount
        // 这需要在 Spring 容器中获取 NamingService Bean
        // NacosDiscoveryService nacosDiscoveryService = ...;
        // nacosDiscoveryService.updateInstanceMetadata(currentCount);
    }
}
4. 创建 Nacos 服务发现与路由服务

创建一个 Spring Service 来封装 Nacos 服务发现和消息路由逻辑。

代码语言:javascript
代码运行次数:0
运行
复制
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
public class NacosWebSocketRoutingService {

    @Autowired
    private NamingService namingService; // 由 Nacos Discovery Starter 自动注入

    @Autowired
    private RestTemplate restTemplate; // 用于调用其他实例的 HTTP API

    private static final String WEBSOCKET_SERVICE_NAME = "websocket-service";
    private static final String MESSAGE_ROUTE_PATH = "/api/route/message"; // 其他实例提供的消息转发API

    /**
     * 根据用户ID,查找其连接所在的服务器实例
     * @param userId 用户ID
     * @return 目标实例的地址 (host:port),或 null
     */
    public String findInstanceByUserId(String userId) {
        try {
            // 1. 获取 websocket-service 的所有健康实例
            ServiceInfo serviceInfo = namingService.getService(WEBSOCKET_SERVICE_NAME);
            List<Instance> instances = serviceInfo.getHosts().stream()
                    .filter(Instance::isHealthy) // 只考虑健康实例
                    .collect(Collectors.toList());

            // 2. (简化) 遍历实例,检查其元数据是否包含该用户ID
            // 更优方案:使用一致性哈希、用户ID分片等策略,元数据中存储用户ID范围
            for (Instance instance : instances) {
                Map<String, String> metadata = instance.getMetadata();
                // 示例:元数据中存储了 "users=user1,user2,user3" 或 "user-range=1-1000"
                String usersStr = metadata.get("users");
                if (usersStr != null && usersStr.contains(userId)) {
                    return instance.getIp() + ":" + instance.getPort();
                }
                // 或者检查用户ID是否在 range 内
                // String range = metadata.get("user-range");
                // if (isUserIdInRange(userId, range)) { ... }
            }
            // (更现实) 如果元数据没有直接存储用户列表,可能需要一个全局的用户-实例映射表
            // 该表可存储在 Redis 或 Nacos Config 中,由各实例维护和监听。
        } catch (NacosException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 向指定实例发送消息 (通过 HTTP API)
     * @param targetInstanceAddress 目标实例地址 (host:port)
     * @param userId 目标用户ID
     * @param message 消息内容
     */
    public void sendMessageToInstance(String targetInstanceAddress, String userId, String message) {
        String url = "http://" + targetInstanceAddress + MESSAGE_ROUTE_PATH;
        RouteMessageRequest request = new RouteMessageRequest(userId, message);
        try {
            // 调用目标实例的 API
            restTemplate.postForObject(url, request, String.class);
        } catch (Exception e) {
            System.err.println("向实例 " + targetInstanceAddress + " 转发消息失败: " + e.getMessage());
        }
    }

    /**
     * 主路由方法:向用户发送消息
     * @param userId 用户ID
     * @param message 消息内容
     */
    public void routeMessageToUser(String userId, String message) {
        // 1. 首先检查本实例是否持有连接 (优化:减少网络调用)
        if (WebSocketEndpoint.isLocalUserConnected(userId)) {
            WebSocketEndpoint.sendMessageToUser(userId, message);
            return;
        }

        // 2. 查询 Nacos 发现目标实例
        String targetInstance = findInstanceByUserId(userId);
        if (targetInstance != null) {
            // 3. 转发消息到目标实例
            sendMessageToInstance(targetInstance, userId, message);
        } else {
            System.out.println("用户 " + userId + " 未找到连接实例,可能已离线。");
            // 处理离线消息或通知发送方
        }
    }

    // (可选) 定期或事件驱动更新本实例在 Nacos 的元数据
    public void updateLocalInstanceMetadata(Map<String, String> metadata) {
        try {
            namingService.updateInstance(WEBSOCKET_SERVICE_NAME,
                    namingService.getGroupName(), // 或从配置获取
                    "your-instance-ip", // 需要获取本机IP
                    namingService.getPort(), // 获取本机端口
                    metadata); // 合并或覆盖元数据
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}

// 路由消息请求 DTO
class RouteMessageRequest {
    private String userId;
    private String message;

    // constructors, getters, setters
    public RouteMessageRequest(String userId, String message) {
        this.userId = userId;
        this.message = message;
    }
    // ... getters and setters
}
5. 创建消息转发 API (在每个实例上)

为了让其他实例能将消息转发给本实例的特定用户,需要暴露一个 HTTP API。

代码语言:javascript
代码运行次数:0
运行
复制
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageRouteController {

    @Autowired
    private NacosWebSocketRoutingService routingService;

    /**
     * 接收来自其他 WebSocket 实例的路由消息
     */
    @PostMapping("/api/route/message")
    public String routeMessage(@RequestBody RouteMessageRequest request) {
        // 在本实例内查找并发送消息
        WebSocketEndpoint.sendMessageToUser(request.getUserId(), request.getMessage());
        return "OK";
    }
}
6. 集成与调用

在业务逻辑中(如另一个 Controller),需要向用户推送消息时:

代码语言:javascript
代码运行次数:0
运行
复制
@RestController
public class NotificationController {

    @Autowired
    private NacosWebSocketRoutingService routingService;

    @PostMapping("/notify")
    public String notifyUser(@RequestParam String userId, @RequestParam String message) {
        // 使用路由服务发送消息,自动处理跨实例问题
        routingService.routeMessageToUser(userId, message);
        return "通知已发送";
    }
}
关键注意事项与优化
  1. @ServerEndpoint 与 Spring Bean: @ServerEndpoint 由 WebSocket 容器管理,不是 Spring Bean。无法直接 @Autowired。解决方案:
    • 静态方法 + ApplicationContext:@PostConstruct 方法中将 ApplicationContext 赋值给静态变量,然后在 WebSocketEndpoint 中通过 ApplicationContext 获取所需的 Service Bean。
    • 使用 ServerEndpointConfig.Configurator: 自定义 Configurator,在 getEndpointInstance 方法中通过 Spring 容器创建 WebSocketEndpoint 实例。
  2. 全局连接状态管理: 仅靠实例元数据难以维护所有用户的精确连接映射。强烈推荐结合 RedisNacos Config 存储一个全局的 userId -> instanceId 映射表。WebSocketEndpoint@OnOpen/@OnClose 时更新此表,并监听表的变化(通过 Redis Pub/Sub 或 Nacos Config Listener)来处理用户迁移或实例故障。
  3. 消息转发方式: HTTP API 简单直接,但性能开销较大。对于高吞吐场景,推荐使用 消息队列 (MQ)。发送方将消息发到 MQ 的特定主题/队列,所有 WebSocket 实例订阅该主题,收到消息后判断是否是本实例的用户,是则投递。
  4. 心跳与连接保活: 实现 WebSocket 心跳 (@OnPing, @OnPong),并配合 Nacos 的健康检查,确保能及时感知连接中断。
  5. 安全性: 对 WebSocket 连接进行身份验证(如 Token 校验),防止未授权访问。
  6. 性能与扩展: 监控连接数、消息吞吐量。根据负载动态调整服务器实例数量,Nacos 会自动更新服务列表。

通过 Spring Boot 整合 Nacos,我们构建了一个具备服务发现能力的分布式 WebSocket 服务器。Nacos 解决了连接发现和实例管理的核心问题。虽然 @ServerEndpoint 与 Spring 的集成需要特殊处理,但通过静态方法、ApplicationContext 或自定义 Configurator 可以克服。为了实现高效的全局状态管理和消息路由,强烈建议引入 Redis 或消息队列作为辅助组件

这种架构使得 WebSocket 服务具备了良好的可扩展性、高可用性和弹性,能够轻松应对用户量的增长和服务器的动态变化,是构建大型实时应用的理想选择。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 核心挑战与 Nacos 的解决方案
  • 技术栈
  • 实现步骤
    • 1. 环境准备
    • 2. 配置 Nacos (application.yml)
    • 3. 创建 WebSocket 服务端点 (ServerEndpoint)
    • 4. 创建 Nacos 服务发现与路由服务
    • 5. 创建消息转发 API (在每个实例上)
    • 6. 集成与调用
  • 关键注意事项与优化
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档