Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SSE 第二篇

SSE 第二篇

作者头像
收心
修改于 2023-03-28 02:52:49
修改于 2023-03-28 02:52:49
1.7K20
代码可运行
举报
文章被收录于专栏:Java实战博客Java实战博客
运行总次数:0
代码可运行

上一篇文章只是简单帮大家梳理一下什么是SSE。这篇文章,则会放上真实Spring框架对SSE的封装了。框架封装了send方法,我们可以通过业务主动去给客户端推送事件。

我本来考虑实现服务器宕机重启后,SSE请求对象保持原有不变,实现前端SSE重连。但是经过实际操作,以及思考后,我发现此方案不能解决此问题。因为响应对象存储在服务端的JUC包下的Map中。我们无法通过Redis存储信息,然后重新获取原来的响应对象。SSE在服务端的响应对象与Session机制类似。也就是无法跨服务使用!所以,我们压根就不用考虑这个问题。这对于SSE来说是个伪需求!

先放上前端代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE,服务器主动触发消息</title>
</head>
<body>
<h1>开发者模式,查看控制台、以及网络可以查看相关输出</h1>
消息类型是message消息:
<div id="ssediv">默认消息</div>
<br>
消息类型是diyEventType消息:
<div id="diyssediv">DIY SSE消息</div>
<br>
<br>
<br>
<br>
<div id="diybutton">
    <button type="button" onclick="connectSSE()">主动连接SSE服务器!</button>
    <button type="button" onclick="closeSSE()">关闭SSE连接!</button>
    <button type="button" onclick="diyclick()">点我模拟服务器发送消息!</button>
</div>
</body>
<script>
    var sse = new EventSource("http://localhost:8089/sse-plus");

    /**
     * 默认是没有指定eventTtpe的消息,但eventType就为message。
     * 等价于addEventListener("message" ...
     */
    sse.onmessage = function (ev) {
        console.info("这里只能处理eventType为message的消息")
        var elementById = document.getElementById("ssediv");
        elementById.innerHTML = ev.data;
    }

    /**
     * 添加指定类型消息处理,eventType是后台自定义的
     */
    sse.addEventListener("diyEventType", event => {
        console.info("自定义事件" + event.data)
        var elementById2 = document.getElementById("diyssediv");
        elementById2.innerHTML = event.data;
    })

    /**
     * SSE连接异常
     */
    sse.onerror = function () {
        alert("服务器已停止!")
    }

    /**
     * SSE连接成功
     */
    sse.onopen = function () {
        alert("服务器已连接!")
    }

    // 不要忘记关闭断开连接哦
    // sse.close()
</script>
<script>
    function diyclick() {
        var xmlHttpRequest = new XMLHttpRequest();
        xmlHttpRequest.open("get", "http://localhost:8089/sendMessage")
        xmlHttpRequest.send()
    }

    // 关闭SSE
    function closeSSE() {
        sse.close()
        console.info("服务器已关闭!")
    }

    // 连接SSE
    function connectSSE() {
        sse.close()
        sse = new EventSource("http://localhost:8089/sse-plus");
        sse.addEventListener("diyEventType", event => {
            console.info("自定义事件" + event.data)
            var elementById2 = document.getElementById("diyssediv");
            elementById2.innerHTML = event.data;
        })
        /**
         * 默认是没有指定eventTtpe的消息,但eventType就为message。
         * 等价于addEventListener("message" ...
         */
        sse.onmessage = function (ev) {
            console.info("这里只能处理eventType为message的消息")
            var elementById = document.getElementById("ssediv");
            elementById.innerHTML = ev.data;
        }
        /**
         * SSE连接异常
         */
        sse.onerror = function () {
            console.info("服务器已停止!")
        }

        /**
         * SSE连接成功
         */
        sse.onopen = function () {
            console.info("服务器已连接!")
        }
    }
</script>
</html>

粘贴Java代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private static Integer sendTimes = 0;
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    @GetMapping(value = "/sse-plus")
    @ResponseBody
    public SseEmitter SseEmitter(HttpServletResponse response) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常,需在全局异常捕获:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L); // 单位ms,如果你设定了,会自动断开。如果前端有自动重试机制,间歇断开可减少连接被长久占用。
        response.setContentType("text/event-stream");   // 指定ContentType,不可变
        response.setCharacterEncoding("utf-8");         // 指定响应字符集,不可变,经测试非UTF-8则会中文乱码,但建议指定utf-8
        String clientId = UUID.randomUUID().toString();
        // 注册回调
        //  >> 回调1:长链接完成后回调接口(即关闭连接时调用)
        sseEmitter.onCompletion(() -> {
            sseCache.remove(clientId);
            log.info("SSE onCompletion: {}连接关闭时触发", clientId);
        });
        //  >> 回调2:出现异常会调用此方法
        sseEmitter.onError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                sseCache.remove(clientId);
                log.info("SSE onError:{}出现异常", clientId);
            }
        });
        //  >> 回调3:出现连接超时,会调用此方法
        sseEmitter.onTimeout(() -> {
            sseCache.remove(clientId);
            log.error("SSE onTimeout:{}超时了", clientId);
        });
        sseCache.put(clientId, sseEmitter);
        log.info("创建新的sse连接,当前用户:{}", clientId);
        try {
            sseEmitter.send(SseEmitter.event().id(clientId).name("diyEventType").data("连接成功" + clientId));
        } catch (IOException e) {
            log.error("SSE: 给客户端发送消息异常,客户端ID:{}", clientId, e);
            throw new RuntimeException("给客户端发送消息异常!", e);
        }
        return sseEmitter;
    }

    /**
     * 长链接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId 客户端ID
     **/
    private Runnable completionCallBack(String clientId) {
        return () -> {
            log.info("结束连接:{}", clientId);
        };
    }

    // 创建一个线程池,用于处理大批量用户掉线的问题
    private static ExecutorService executorService = new ThreadPoolExecutor(3,
            5,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(50),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()); // 采用阻塞队列。不丢任何消息

    /**
     * 缺陷:此处发送消息时候,如果用户连接失效,服务器实际无法感知,只能通过再次调用send出现异常时候来判断用户已经断联
     * 如果采用重试机制,很容易造成阻塞。如果用户体量很大,建议采用MQ的方式,将消息甩到MQ。剩下由其他服务或线程来处理。
     *
     * @return
     */
    @GetMapping(value = "/sendMessage")
    @ResponseBody
    public String sendMessage() {
        TimeInterval timer = DateUtil.timer();
        sendTimes++;
        // 这里从缓存中拿到sse对象,调用send方法实现主动推送
        sseCache.forEach((clientId, sseEmitter) -> {
            try {
                // 建议每次发送都以新Obj:此对象是builder,调用一次,相当于append一次!
                SseEmitter.SseEventBuilder data = SseEmitter.event()
                        .reconnectTime(5000)
                        .name("diyEventType");
                // 直接调用对象的send方法就可以,自定义主动推送消息了
                data.id(clientId)
                        .data("宝贝Aa1😊 " + clientId + " : " + sendTimes, MediaType.APPLICATION_JSON)
                        .data("</br>当前连接用户数: " + sseCache.size())
                        .data("</br>发送本次耗时:" + timer.interval());
                sseEmitter.send(data);
            } catch (IOException e) {
                executorService.execute(() -> {
                    // 推送消息失败后,每隔3s推送一次,推送5次。如果不使用线程池,就会导致发消息时,重试机制导致其他用户消息无法处理!
                    for (int i = 0; i < 5; i++) {
                        try {
                            Thread.sleep(3000);
                            if (sseEmitter == null) {
                                log.error("消息推送出现异常:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                                continue;
                            }
                            // 这里data非全局消息,想办法抽离出去即可 sseEmitter.send(data);
                        } catch (Exception ex) {
                            log.error("消息推送出现异常:{}的第{}次消息重推失败", clientId, i + 1, ex);
                            continue;
                        }
                        if (i == 4) {
                            sseCache.remove(clientId);
                            log.error("由于用户{},消息推送老是失败,则不再尝试推送消息!", clientId);
                        }
                        //log.info("消息推送出现异常:{}的第{}次消息重推成功,{}", clientId, i + 1, data);
                        return;
                    }
                });

            }
        });
        return "消息推送成功!";
    }

特殊说明: 以上文章,均是我实际操作,写出来的笔记资料,不会盗用别人文章!烦请各位,请勿直接盗用!转载记得标注来源!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
2 条评论
热度
最新
咨询一个问题,关于这个SSE,我本地启动项目调试是正常的,浏览器可以正常接受服务端的消息推送,但是部署在云上,浏览器就收不到消息了,ngnix相关的缓存配置也关闭了,请教下解决思路
咨询一个问题,关于这个SSE,我本地启动项目调试是正常的,浏览器可以正常接受服务端的消息推送,但是部署在云上,浏览器就收不到消息了,ngnix相关的缓存配置也关闭了,请教下解决思路
11点赞举报
打开 network 看下你浏览器报错,看下你服务器地址对不对哦。
打开 network 看下你浏览器报错,看下你服务器地址对不对哦。
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
SSE 服务端消息推送
SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的。有一种变通方法,就是服务器向客户端声明,发送的是流信息,本质上,这种通信就是以流信息的方式。
默存
2022/12/03
2.2K0
SSE – Server Sent Events – 服务端主动推送
SSE是一个轻量级协议,相对简单;WebSocket是一种较重的协议,相对复杂。但SSE只支持单向交互(服务器给客户发送),Websocket支持双向交互。
收心
2022/11/22
3K0
SSE – Server Sent Events – 服务端主动推送
使用SSE对接清华chatGLM模型
用户10136162
2023/12/14
6270
重学Springboot系列之服务器推送技术
若干年前,所有的请求都是由浏览器端发起,浏览器本身并没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。比如:
大忽悠爱学习
2021/12/07
2.4K0
重学Springboot系列之服务器推送技术
基于SpringBoot与RabbitMQ的SSE分布式广播推送设计
实现逻辑简单说明,分布式系统,当前项目有一个局部刷新的业务场景,后端处理完数据需要实时推送到前端,之前的处理办法是 WebSocket + redis ,但是 WebSocket 老断,后面考虑做一些报文加密之类的,考虑 WebSocket协议升级全双工通信之后Servlet过滤器之类不再适用,而且也没有客户端推送的需求,所以考虑使用 SSE 加 MQ 的方式,可以基于当前Web安全框架,不需要额外编码
山河已无恙
2025/03/27
2470
基于SpringBoot与RabbitMQ的SSE分布式广播推送设计
【SpringBoot WEB 系列】SSE 服务器发送事件详解
SSE 全称Server Sent Event,直译一下就是服务器发送事件,一般的项目开发中,用到的机会不多,可能很多小伙伴不太清楚这个东西,到底是干啥的,有啥用
一灰灰blog
2020/04/24
6.3K0
【SpringBoot WEB 系列】SSE 服务器发送事件详解
我有 7种 实现web实时消息推送的方案,7种!
做了一个小破站,现在要实现一个站内信web消息推送的功能,对,就是下图这个小红点,一个很常用的功能。
程序员小富
2022/07/19
11.6K0
我有 7种 实现web实时消息推送的方案,7种!
SpringBoot实现网页消息推送的5种方法
项目开发中,实时消息推送已成为提升用户体验的关键技术。无论是聊天应用、通知系统、实时数据展示,还是协同办公场景,都需要服务器能够主动向客户端推送消息。本文将详细介绍SpringBoot中实现网页消息推送的几种主流方案,帮助开发者根据实际需求选择最合适的技术。
阿珍
2025/05/28
2190
SpringBoot实现网页消息推送的5种方法
SpringBoot入门建站全系列(三十七)WebSSE做简单的聊天室
提到服务端数据推送,你可以一下子就想到了Websocket,WebSocket是一种全新的协议,随着HTML5草案的不断完善,越来越多的现代浏览器开始全面支持WebSocket技术了,它将TCP的Socket(套接字)应用在了webpage上,从而使通信双方建立起一个保持在活动状态连接通道。
品茗IT
2020/12/23
1.6K0
SpringBoot入门建站全系列(三十七)WebSSE做简单的聊天室
09. Springboot集成sse服务端推流
如果项目中有一个场景,假设对接ChatGPT或对接天气类接口的时候,需要服务端主动往客户端进行消息推送或推流。通常的做法有:
有一只柴犬
2024/01/28
11.1K0
09. Springboot集成sse服务端推流
服务器推送技术
之前所有的请求都是浏览器发起,浏览器本身没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。
全栈程序员站长
2022/09/07
1.9K0
服务器推送技术
SpringBoot集成websocket实现后端向页面发送消息
1、引入依赖: compile "org.springframework.boot:spring-boot-starter-web:${verSpringBoot}" compile "org.springframework.boot:spring-boot-starter-websocket:${verSpringBoot}" 2、添加Webscoket配置: /** * ServerEndpointExporter 作用 * * 这个Bean会自动注册使用@Se
JQ实验室
2022/02/10
1.3K0
WebSocket开发(一对一聊天)功能
在之前的文章:Spring Boot使用WebSocket模拟聊天 中简单的建立了Spring boot项目并集成了websocket实现了一些入门demo,本篇文章则是在之前的基础上增加一对一私聊和统计在线人数等功能。
余生大大
2022/11/02
1.6K0
WebSocket开发(一对一聊天)功能
SSE(Server-sent events)技术在web端消息推送和实时聊天中的使用
最近在公司闲着没事研究了几天,终于搞定了SSE从理论到实际应用,中间还是有一些坑的。 1.SSE简介 SSE(Server-sent events)翻译过来为:服务器发送事件。是基于http协议,和WebSocket的全双工通道(web端和服务端相互通信)相比,SSE只是单通道(服务端主动推送数据到web端),但正是由于此特性,在不需要客户端频繁发送消息给服务端,客户端却需要实时或频繁显示服务端数据的业务场景中可以使用。如:新邮件提示,在浏览网页时提示有新信息或新博客,监控系统实时显示数据。。。 在web端
用户1558882
2018/04/03
5.5K0
SSE(Server-sent events)技术在web端消息推送和实时聊天中的使用
springboot单向推送给客户端SseEmitter
SseEmitter 是 Spring Framework 提供的一个类,用于处理服务器向客户端推送事件(Server-Sent Events, SSE)的功能。与 WebSocket 不同,SSE 是单向的,服务器可以推送数据到客户端,但客户端不能推送数据到服务器。SSE 适用于场景包括实时消息通知、进度更新、数据流推送等。Spring Boot 中通过 SseEmitter 实现 SSE 通信简单且高效。
阿超
2024/09/08
2.4K0
Spring 实现 3 种异步流式接口,干掉接口超时烦恼
这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。
程序员小富
2024/10/14
5060
SSE打扮你的AI应用,让它美美哒
因为,行文字数所限,有些概念可能会一带而过亦或者提供对应的学习资料。请大家酌情观看。
前端柒八九
2024/07/15
4060
SSE打扮你的AI应用,让它美美哒
为什么ChatGPT采用SSE协议而不是Websocket?
在探索ChatGPT的使用过程中,我们发现GPT采用了流式数据返回的方式。理论上,这种情况可以通过全双工通信协议实现持久化连接,或者依赖于基于EventStream的事件流。然而,ChatGPT选择了后者,也就是本文即将深入探讨的SSE(Server-Sent Events)技术。
程序新视界
2023/12/30
3.3K0
为什么ChatGPT采用SSE协议而不是Websocket?
AI大模型文本流如何持续吐到前端,服务端实时通信技术 SSE(Server-Sent Events) 认知
99%的焦虑都来自于虚度时间和没有好好做事,所以唯一的解决办法就是行动起来,认真做完事情,战胜焦虑,战胜那些心里空荡荡的时刻,而不是选择逃避。不要站在原地想象困难,行动永远是改变现状的最佳方式
山河已无恙
2025/02/25
5260
AI大模型文本流如何持续吐到前端,服务端实时通信技术 SSE(Server-Sent Events) 认知
Spring 4.x 支持异步请求处理
前两天看Spring框架参考手册,发现SpringMVC在4.0版本上支持异步请求处理。废话不多说,开始异步请求之旅。
zhangheng
2020/04/28
1.7K0
相关推荐
SSE 服务端消息推送
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验