ChatGPT 刚出的时候,让大伙很好奇的是它是如何实现的逐字输出的?答案就是 SSE (服务器发送事件)。随着实时数据和响应式编程的需求不断增加,服务器发送事件(Server-Sent Events,简称 SSE)在现代 Web 应用程序中越来越受欢迎。SSE 提供了一种轻量级的服务器推送数据给客户端的方式,适合用于监控、实时通知、股票价格更新等场景。
在 Spring Boot 3 中,结合响应式编程的理念,SSE 的实现变得更加简洁和高效。本文将详细介绍如何使用 Spring Boot 3 来实现 SSE 服务端推送,并讨论响应式编程在此过程中的重要性和优势。
服务器发送事件(SSE) 是一种从服务器向客户端推送数据的技术,属于 HTML5 的一部分。与传统的 HTTP 请求-响应模型不同,SSE 是单向的,服务器可以持续不断地向客户端发送数据,而客户端通过一次长连接持续接收这些更新。
相比 WebSocket,SSE 有以下特点:
Spring Boot 3 提供了对响应式编程的全面支持,基于 Project Reactor 实现异步、非阻塞的流式数据处理。而响应式编程非常适合实现 SSE,因为它允许我们以非阻塞的方式持续推送数据,而不会阻塞服务器的资源。
Spring WebFlux 是 Spring Boot 3 中用于构建响应式应用的核心框架,它可以无缝集成 SSE,为我们提供简单高效的服务器推送功能。
传统的阻塞式编程在处理长连接(如 SSE)时可能会占用大量服务器资源。响应式编程通过非阻塞 I/O 操作,不仅可以高效处理长时间的连接,还能在有新数据时立即推送给客户端。响应式流(如 Flux
)天然适合于这种流式数据推送场景。
我们将通过以下步骤实现一个简单的 SSE 服务端推送应用:
首先,创建一个新的 Spring Boot 3 项目,并确保引入了 spring-boot-starter-webflux
依赖。可以通过 Maven 或 Gradle 配置:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
在 Spring WebFlux 中,SSE 通过返回 Flux<ServerSentEvent<T>>
这种响应流来实现。下面我们实现一个简单的 SSE 控制器,它会每隔一段时间向客户端推送当前的时间信息。
package com.coderjia.boot3webflux.controller;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.time.LocalTime;
/**
* @author CoderJia
* @create 2024/10/27 下午 07:03
* @Description
**/
@Controller
public class SseController {
@GetMapping("/sse/stream")
public Flux<ServerSentEvent<String>> streamSse() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("Current time: " + LocalTime.now())
.build());
}
}
Flux.interval(Duration.ofSeconds(1))
:创建一个每秒发出事件的响应式流。ServerSentEvent.builder()
:构建 ServerSentEvent
对象,它可以包含 id
、event
和 data
等信息,符合 SSE 规范。map()
:将流中的每个事件映射为 ServerSentEvent
,并附带当前的时间信息。客户端可以使用 JavaScript 原生的 EventSource
API 来接收服务器发送的 SSE 数据流。
示例 HTML + JavaScript 客户端
resources/static/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Example</title>
</head>
<body>
<h1>Server-Sent Events (SSE) Example</h1>
<div id="messages"></div>
<script src="https://unpkg.com/axios/dist/axios.min.js"></script>
<script>
const http = axios.create({
baseURL: 'http://localhost:8080/',
timeout: 100000,
responseType: 'stream',
onDownloadProgress: function (progressEvent) {
// 获取 messages 元素
const messagesElement = document.getElementById("messages");
// 清除现有内容
messagesElement.innerHTML = "";
// 添加新内容
const newElement = document.createElement("div");
newElement.innerHTML = progressEvent.event.currentTarget.responseText + "<br/>";
messagesElement.appendChild(newElement);
},
});
http.get('/sse/stream')
.then(function (response) {
// 处理成功情况
console.log(response);
})
.catch(function (error) {
// 处理错误情况
console.log(error);
})
.finally(function () {
// 总是会执行
});
</script>
</body>
</html>
解释
EventSource("/sse/stream")
:EventSource 是浏览器提供的一个用于和服务器建立连接,接收服务器发送事件的接口。在客户端发起与服务器的 SSE 长连接。服务器通过 /sse/stream
推送事件。onmessage
:处理服务器发送的消息,并将消息显示在页面上。onerror
:当连接发生错误时关闭连接,避免持续消耗资源。运行 Spring Boot 应用,并访问 /sse/stream
,可以看到服务器每秒钟向客户端推送一次当前时间信息。
header 里的 Content-Type 为 text/event-stream
。
可以通过浏览器打开 http://localhost:8080/
,在页面中将会每秒钟显示一次服务器推送的数据流。这就验证了 SSE 在 Spring Boot 3 中的实现。
为了模拟更真实的场景,可以增加一些随机数据或实时数据更新。假设我们希望推送随机的股票价格,我们可以这样修改:
@GetMapping("/sse/stocks")
public Flux<ServerSentEvent<String>> streamStockPrices() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("stock-update")
.data("Stock price: $" + ThreadLocalRandom.current().nextInt(100, 200))
.build());
}
在这个例子中,每秒推送一次随机的股票价格更新。
SSE 连接如果长时间没有数据传输,可能会被中断。为此,SSE 规范推荐发送 "ping" 消息来保持连接活跃。可以通过 ServerSentEvent
的 comment()
来发送心跳信息:
@GetMapping("/sse/stream-with-ping")
public Flux<ServerSentEvent<String>> streamWithPing() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
if (sequence % 5 == 0) { // 每5秒发送一次心跳
return ServerSentEvent.<String>builder()
.comment("ping")
.build();
} else {
return ServerSentEvent.<String>builder()
.data("Current time: " + LocalTime.now())
.build();
}
});
}
MediaType.TEXT_EVENT_STREAM
响应虽然 ServerSentEvent
是处理 SSE 的标准类,但你也可以直接返回 Flux<T>
,Spring 会自动将其转换为事件流。如果你想简化代码,可以这样写:
@GetMapping(value = "/sse/simple", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> simpleSse() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Current time: " + LocalTime.now());
}
这里直接返回 Flux<String>
,Spring WebFlux 会自动推送数据。
SSE 和 WebSocket 都是实时通信的重要技术,但它们有不同的适用场景:
对于简单的实时更新场景,如股票价格更新、推送通知等,SSE 更加轻量且易于实现。
Spring Boot 3 提供了简单、强大的 SSE 实现,结合响应式编程的特性,使得我们可以轻松构建高效的服务器推送应用。在实际项目中,SSE 非常适合用于推送实时数据或监控信息,尤其在需要轻量且可靠的单向通信时。通过 Spring WebFlux 和 Project Reactor,SSE 的实现可以以非阻塞的方式运行,极大提升了应用的并发处理能力。
希望这篇博客对你理解 Spring Boot 3 中的 SSE 服务端推送有所帮助,如果有任何问题或想法,欢迎讨论!