首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用ReactiveElasticsearchClient时配置size.codec.max内存大小

使用ReactiveElasticsearchClient时配置size.codec.max内存大小
EN

Stack Overflow用户
提问于 2019-12-13 16:23:18
回答 4查看 39.9K关注 0票数 20

我正在使用Spring-DataElasticSearch3.2.3和spring 2.2.0的ReactiveElasticsearchClient。当升级到spring 2.2.2时,org.springframework.core.io.buffer.DataBufferLimitException:超出了缓冲区的最大字节数限制: 262144。

它被暗示要使用Spring.codec.max内存大小,但我还是得到了相同的例外。

整个例外:

代码语言:javascript
运行
复制
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
    reactor.core.publisher.Flux.collect(Flux.java:3273)
    org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
    |_     Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
    |_      Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
    |_         Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
    |_         Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
    |_       checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
    |_         Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
    |_    Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
    |_     Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
    |_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
    |_     Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
    |_         Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
    |_      Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
    |_         Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
    |_         Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
        at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
        at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
        at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

有人能告诉我我做错了什么吗?

谢谢

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2020-06-11 12:46:59

从SpringBoot2.3.0开始,现在有一个专用的配置属性。用于反应性弹性搜索REST客户端。

可以使用以下配置属性为客户端设置特定的内存限制。

代码语言:javascript
运行
复制
spring.data.elasticsearch.client.reactive.max-in-memory-size=

已经存在的spring.codec.max-in-memory-size属性是独立的,仅影响应用程序中的其他WebClient实例。

票数 5
EN

Stack Overflow用户

发布于 2019-12-27 22:10:41

几天前,我实现了定制WebClient、检查相应的Jira问题的可能性。这将在3.2.4中提供,并且已经在当前的主分支中了。

配置代码如下所示:

代码语言:javascript
运行
复制
@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
                .connectedTo("localhost:9200") //
                .withWebClientConfigurer(webClient -> {
                    ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                            .codecs(configurer -> configurer.defaultCodecs()
                                    .maxInMemorySize(-1))
                            .build();
                    return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
                })
                .build();
        return ReactiveRestClients.create(clientConfiguration);

    }
}
票数 5
EN

Stack Overflow用户

发布于 2020-06-24 22:26:25

或者:

代码语言:javascript
运行
复制
    final Consumer<ClientCodecConfigurer> consumer = configurer -> {
        final ClientCodecConfigurer.ClientDefaultCodecs codecs = configurer.defaultCodecs();
        codecs.maxInMemorySize(maxBufferMb * 1024 * 1024);
    };

    WebClient.builder().codecs(consumer).build();
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59326351

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档