首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何将单次落库合并为批量落库,提升处理性能?

场景

在一些流处理中,比如 kafka 消费等,我们需要不停的解析处理消息,然后进行入库。

有时候消息需要进行落库,每次都是单个落库,对数据库的压力比较大。

可不可以把单个操作变化为批量入库,来提升性能呢?

单个落库改为批量

目的

比如 100 条数据要落库,单个调用数据库 100 次,比一次批量入库耗时要多。

所以我们可以想办法把单个调用进行合并,然后调用入库。

单个同步落库

最基础的单个同步落库流程如下:

根据固定数量批量

我们可以固定一个内存大小,比如满足 100 个才进行入库,否则就放在内存中。

流程如下:

定时触发批量入库

固定数量适合触发比较多的情况。

如果命中的数据不多, 比如一些异常匹配处理等,但是对实时性要求又比较高。

可以通过定时任务来触发批量信息的入库,其他没变。

核心实现代码

单个数据的落库

每一次数据处理时,直接放入内存队列。

// 感觉这里可以直接替换为 COW,保持高并发。// 或者使用 concurrentHashMap,不过需要处理 keyprotected synchronized void addToList(final T object) { this.innerList.add(object); // 事后处理 this.addToListAfter(object);}

固定数量的场景

如果是固定数量的批量入库,我们可以在每一次加入内存队列之后,判断是否满足入库条件。

/** * 触发是否满足 fixed size? * @param object 对象 */protected void addToListAfter(final T object) { if(this.innerList.size() >= this.batchConfig.getBatchSize()) { log.debug("[Stream2Batch] addToListAfter fired save start..."); // 真正触发保存 fireBatchSave(); log.debug("[Stream2Batch] addToListAfter fired save end..."); }}

真正的批量保存逻辑

批量保存时,为了提升性能,也可以转换为异步入库。

protected void fireBatchSave() { try { log.info("[Stream2Batch] Fire batch save start..."); // 资源加锁 synchronized (innerList) { // 拷贝资源 final List<T> copyList = new ArrayList<>(innerList); // 执行保存 // 同步保存 if(batchConfig.isBatchSaveAsyncFlag()) { actualSaveThread.submit(new Runnable() { @Override public void run() { actualBatchSave(copyList); } }); } else { this.actualBatchSave(copyList); } // 资源清空 innerList.clear(); } log.info("[Stream2Batch] Fire batch save end..."); } catch (Exception e) { log.error("[Stream2Batch] FireBatchSave meet ex", e); batchConfig.getFireBatchSaveErrorHandler().onError(e); }}

定时调度触发

定时调度触发批量的方式,可以定时触发是否入库。

/** * 初始化保存调度线程池 */private void initSaveFireThread() { saveFireThread.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 触发保存 fireBatchSave(); } }, batchConfig.getBatchFixedTimeInterval(), batchConfig.getBatchFixedTimeInterval(), TimeUnit.SECONDS);}

其他不变。

已有的实现

说明

当然,我们每次自己定义也比较麻烦。有一些已有的实现可以直接使用。

maven 依赖

<groupId>com.github.houbb</groupId> <artifactId>stream2batch-core</artifactId> <version>0.1.0</version>

固定时间间隔

适合场景:匹配的数量一般,对实时性有一定的要求。可以通过定时调度的方式驱动。

我们只需要定义好对应的 storeSingle/storeBatch 即可。

其他的很多属性也都可以自定义配置。

IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance() // 单个保存策略 .storeSingle(new FakeStoreSingle<>()) // 批量保存策略 .storeBatch(new FakeStoreBatch<>()) .fixedTime();

for(int i = 0; i < 100; i++) { UserInfo userInfo = new UserInfo(); userInfo.setUsername("u-"+i); stream2Batch.execute(userInfo); }

TimeUnit.SECONDS.sleep(10);

// 资源关闭stream2Batch.shutdown();

固定大小

使用场景:匹配的数量会比较多,为了避免内存压力过大,采用固定数量的方式驱动。

public static void main(String[] args) throws InterruptedException { IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance() // 单个保存策略 .storeSingle(new FakeStoreSingle<>()) // 批量保存策略 .storeBatch(new FakeStoreBatch<>()) .batchSize(10) .fixedSize();

for(int i = 0; i < 100; i++) { UserInfo userInfo = new UserInfo(); userInfo.setUsername("u-"+i); stream2Batch.execute(userInfo); }

TimeUnit.SECONDS.sleep(10);

// 资源关闭 stream2Batch.shutdown();}

小结

流式处理转批量处理是一个比较场景的优化方式。

异步入库也可以大幅度提升吞吐量,在一个实时链路场景可以考虑使用。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/Okycc5HcW6ekDgJsGuqhW3TA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券