
响应式数据流常常会在不同的线程上执行操作,例如:
Flux.just(1, 2, 3)
.publishOn(Schedulers.parallel())
.map(i -> i * 2) // 在并行线程执行
.subscribeOn(Schedulers.single())
.subscribe(System.out::println); // 在单一线程执行这种线程切换可能导致:
在响应式编程中,推荐采用不可变对象(Immutable Objects)来避免线程安全问题:
// 好的实践 - 使用不可变对象
public class ImmutableData {
private final int value;
public ImmutableData(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}应避免的状态共享模式包括:
Mono/Flux本身是线程安全的Atomic系列类处理简单状态:AtomicInteger counter = new AtomicInteger(0);
Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> counter.incrementAndGet())
.sequential()
.blockLast();对于复杂操作,可以使用:
synchronized块(谨慎使用)ReentrantLock等显式锁ConcurrentHashMap根据场景选择合适的调度器:
Schedulers.immediate():在当前线程执行Schedulers.single():单一后台线程Schedulers.parallel():固定大小的线程池Schedulers.elastic():可扩展的线程池StepVerifier结合virtualTime测试异步行为StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofSeconds(1)).take(5)
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(5))
.expectNextCount(5)
.verifyComplete();Thread.currentThread().getName()调试Hooks.onOperatorDebug())背压机制本身需要考虑线程安全:
onBackpressureBuffer等操作符时需要考虑缓冲区大小和线程安全在响应式链中传递线程本地信息:
Mono.deferContextual(ctx ->
Mono.just("Hello " + ctx.get("user"))
)
.contextWrite(Context.of("user", "Alice"))
.subscribe(System.out::println);数据库访问的特殊考虑: