我对spring流量很陌生,我有一个客户端应用程序,它消耗服务器发送的事件,事件是由服务器随机发布的,没有固定的延迟。但是,如果没有事件,消费者在60秒后抛出io.netty.handler.timeout.ReadTimeoutException: null
。
服务器端事件使用者代码
webClient.get()
.uri("http://localhost:8080/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(type)
.subscribe(event -> process(event));
我需要客户连接,即使有很长一段时间没有事件。
完全例外
[36mr.netty.http.client.HttpClientConnect [...] The connection observed an error
io.netty.handler.timeout.ReadTimeoutException: null
reactor.Flux.MonoFlatMapMany.1 onError(org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException)
reactor.Flux.MonoFlatMapMany.1
org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.5.jar:5.3.5]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
发布于 2021-06-01 14:57:24
在服务器发送事件的Mozilla描述中有一个注释:
冒号作为行的第一个字符本质上是一个注释,并被忽略。备注:注释行可用于防止连接超时;服务器可以定期发送评论以保持连接正常运行。
因此,定期发送评论可以使连接保持活跃。那么,我们如何发送评论呢?
好吧,spring有类ServerSentEvent,它有函数ServerSentEvent#comment。因此,如果我们将这个类与例如Flux#interval结合使用,我们就可以在只包含注释keep alive
的事件中合并。
下面是我不久前构建的一个项目的一个例子
@Bean
public RouterFunction<ServerResponse> foobars() {
return route()
.path("/api", builder -> builder
.GET("/foobar/{id}", accept(TEXT_EVENT_STREAM), request -> ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.header("Cache-Control", "no-transform")
.body(Flux.merge(foobarHandler.stream(request.pathVariable("id")),
Flux.interval(Duration.ofSeconds(15)).map(aLong -> ServerSentEvent.<List<FoobarResponse>>builder()
.comment("keep alive").build())), new ParameterizedTypeReference<ServerSentEvent<List<FoobarResponse>>>(){}))
.build();
}
发布于 2022-07-08 08:35:54
Webflux使用默认的超时回退,最终将显示io.netty.handler.timeout.ReadTimeoutException: null
。可以通过将自定义超时回退到超时值方法来防止此错误:
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback);
此外,您还可以使用onErrorContinue、onErrorReturn、.等方法。要正确处理Flux中的异常,例如:
return webClient.get().uri(url).retrieve().bodyToFlux(String.class)
.timeout(timeout, Mono.error(new ReadTimeoutException("Timeout")))
.onErrorContinue((e, i) -> {
// Log the error here.
});
如果您想在默认情况下禁用所有这些日志,可以将该行添加到文件application.properties中。
logging.level.reactor.netty.http.client.HttpClient=OFF
https://stackoverflow.com/questions/67792501
复制相似问题