本文将围绕 Reactor 框架,深入剖析响应式流的核心机制,重点讲解背压(Backpressure)的实现原理与实际应用。通过理论结合实践,希望帮助你真正掌握 Java 世界的响应式异步编程。
响应式编程(Reactive Programming)是一种声明式的编程范式,强调数据流和变化传播。它最初的设计目标是应对异步数据流的处理问题,主要特点有:
Reactive Streams 是由 Java 业界几大厂商联合制定的一个标准接口,用于异步流的处理,核心接口包括:
Publisher<T>
:发布数据的源。Subscriber<T>
:消费数据的订阅者。Subscription
:连接 Publisher 和 Subscriber,处理订阅和取消订阅。Processor<T, R>
:既是 Subscriber 也是 Publisher,可用于数据处理和桥接。Java 9 中引入的 java.util.concurrent.Flow
是该规范的标准实现。
Reactor 是由 Spring 团队维护的响应式编程库,底层基于 Reactive Streams 接口,是 Spring WebFlux 的核心引擎。它提供了两个核心类型:
Mono<T>
:表示 0 或 1 个元素的异步序列。Flux<T>
:表示 0 到 N 个元素的异步序列。Reactor 的设计目标包括:
Mono<String> mono = Mono.just("Hello");
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
你也可以从集合、流、异步回调中构建:
Flux<String> fromList = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Flux<Integer> range = Flux.range(1, 10);
Mono<String> fromFuture = Mono.fromFuture(CompletableFuture.supplyAsync(() -> "Async"));
Reactor 提供了丰富的操作符用于数据处理和流控制,例如:
map
, flatMap
filter
, distinct
reduce
, collectList
merge
, zip
, combineLatest
onErrorResume
, retry
, doOnError
subscribeOn
, publishOn
示例:
Flux.range(1, 5)
.map(i -> i * 2)
.filter(i -> i % 3 == 0)
.subscribe(System.out::println);
在异步系统中,生产者和消费者处理能力往往不一致。例如:
此时,如果没有控制策略,缓冲区可能迅速被填满,导致内存溢出或系统崩溃。
背压机制的作用就是让消费者通知生产者:“请慢一点,我跟不上了。”
Reactive Streams 规范原生支持背压。流程如下:
Subscriber
调用 Subscription.request(n)
请求 n 条数据。Publisher
仅在收到请求后才推送数据。request()
,则不会接收到任何数据。Flux<Integer> flux = Flux.range(1, 1000);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // 仅请求 10 条
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
if (value == 10) {
cancel(); // 手动取消订阅
}
}
});
Reactor 默认是响应式拉模式(pull-based),支持以下策略:
onBackpressureBuffer
、onBackpressureDrop
等指定处理方式。Flux.range(1, 10000)
.onBackpressureBuffer(100,
dropped -> System.out.println("Dropped: " + dropped))
.publishOn(Schedulers.parallel(), 10)
.subscribe(System.out::println);
Schedulers.immediate()
:在当前线程执行。Schedulers.single()
:单线程执行。Schedulers.parallel()
:适用于 CPU 密集型任务。Schedulers.elastic()
:适用于 I/O 密集型任务。Schedulers.boundedElastic()
:最大线程数量受限,可重用。Mono.fromCallable(() -> {
System.out.println("IO: " + Thread.currentThread().getName());
return "result";
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(data -> {
System.out.println("CPU: " + Thread.currentThread().getName());
return data.toUpperCase();
})
.subscribe(System.out::println);
注意:subscribeOn 影响数据源的执行线程,publishOn 影响后续操作的执行线程。
假设我们正在构建一个异步数据处理服务,从数据库获取数据,做复杂计算后写入 Redis 缓存。我们使用 Reactor 实现非阻塞式处理,支持背压。
public class DataProcessor {
private final ReactiveRepository repository;
private final ReactiveRedisTemplate<String, String> redisTemplate;
public Mono<Void> processAll() {
return repository.fetchAll()
.publishOn(Schedulers.boundedElastic()) // 数据库 I/O
.map(this::heavyCompute)
.flatMap(data -> redisTemplate.opsForValue()
.set(data.getId(), data.toJson()))
.then(); // 返回 Mono<Void>
}
private Data heavyCompute(Data input) {
// CPU 密集型任务
return input.enrich().transform();
}
}
repository.fetchAll()
.onBackpressureBuffer(1000,
d -> System.out.println("Dropped data: " + d.getId()))
.limitRate(100) // 限制每次最多拉取 100 个元素
.subscribe(data -> process(data));
StepVerifier.create(Mono.just("hello").map(String::toUpperCase))
.expectNext("HELLO")
.verifyComplete();
Flux.range(1, 5)
.log()
.map(i -> i * 2)
.subscribe(System.out::println);
checkpoint()
定位错误someFlux
.checkpoint("Before transformation")
.map(this::someRiskyMethod)
.checkpoint("After transformation")
.subscribe();
Spring 5 引入了 WebFlux 模块,使用 Netty 作为非阻塞服务器,底层完全基于 Reactor。
@RestController
@RequestMapping("/users")
public class UserController {
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id);
}
@GetMapping
public Flux<User> listUsers() {
return userService.findAll();
}
}
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByAgeGreaterThan(int age);
}
.then()
来表明只关心完成信号。.flatMap()
而不是 .map()
处理异步逻辑。block()
。误区 | 正确做法 |
---|---|
直接调用 block() 获取值 | 在测试中可用,生产环境应避免 |
所有操作都用 subscribe() | 尽量构建数据流,交由 WebFlux 管理 |
忽略线程切换 | 使用 subscribeOn 与 publishOn 明确切换 |
不处理错误流 | 始终加上 .onErrorXxx() 操作 |