场景
在一些流处理中,比如 kafka 消费等,我们需要不停的解析处理消息,然后进行入库。
有时候消息需要进行落库,每次都是单个落库,对数据库的压力比较大。
可不可以把单个操作变化为批量入库,来提升性能呢?
单个落库改为批量
目的
比如 100 条数据要落库,单个调用数据库 100 次,比一次批量入库耗时要多。
所以我们可以想办法把单个调用进行合并,然后调用入库。
单个同步落库
最基础的单个同步落库流程如下:
根据固定数量批量
我们可以固定一个内存大小,比如满足 100 个才进行入库,否则就放在内存中。
流程如下:
定时触发批量入库
固定数量适合触发比较多的情况。
如果命中的数据不多, 比如一些异常匹配处理等,但是对实时性要求又比较高。
可以通过定时任务来触发批量信息的入库,其他没变。
核心实现代码
单个数据的落库
每一次数据处理时,直接放入内存队列。
// 感觉这里可以直接替换为 COW,保持高并发。// 或者使用 concurrentHashMap,不过需要处理 keyprotected synchronized void addToList(final T object) { this.innerList.add(object); // 事后处理 this.addToListAfter(object);}
固定数量的场景
如果是固定数量的批量入库,我们可以在每一次加入内存队列之后,判断是否满足入库条件。
/** * 触发是否满足 fixed size? * @param object 对象 */protected void addToListAfter(final T object) { if(this.innerList.size() >= this.batchConfig.getBatchSize()) { log.debug("[Stream2Batch] addToListAfter fired save start..."); // 真正触发保存 fireBatchSave(); log.debug("[Stream2Batch] addToListAfter fired save end..."); }}
真正的批量保存逻辑
批量保存时,为了提升性能,也可以转换为异步入库。
protected void fireBatchSave() { try { log.info("[Stream2Batch] Fire batch save start..."); // 资源加锁 synchronized (innerList) { // 拷贝资源 final List<T> copyList = new ArrayList<>(innerList); // 执行保存 // 同步保存 if(batchConfig.isBatchSaveAsyncFlag()) { actualSaveThread.submit(new Runnable() { @Override public void run() { actualBatchSave(copyList); } }); } else { this.actualBatchSave(copyList); } // 资源清空 innerList.clear(); } log.info("[Stream2Batch] Fire batch save end..."); } catch (Exception e) { log.error("[Stream2Batch] FireBatchSave meet ex", e); batchConfig.getFireBatchSaveErrorHandler().onError(e); }}
定时调度触发
定时调度触发批量的方式,可以定时触发是否入库。
/** * 初始化保存调度线程池 */private void initSaveFireThread() { saveFireThread.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 触发保存 fireBatchSave(); } }, batchConfig.getBatchFixedTimeInterval(), batchConfig.getBatchFixedTimeInterval(), TimeUnit.SECONDS);}
其他不变。
已有的实现
说明
当然,我们每次自己定义也比较麻烦。有一些已有的实现可以直接使用。
maven 依赖
<groupId>com.github.houbb</groupId> <artifactId>stream2batch-core</artifactId> <version>0.1.0</version>
固定时间间隔
适合场景:匹配的数量一般,对实时性有一定的要求。可以通过定时调度的方式驱动。
我们只需要定义好对应的 storeSingle/storeBatch 即可。
其他的很多属性也都可以自定义配置。
IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance() // 单个保存策略 .storeSingle(new FakeStoreSingle<>()) // 批量保存策略 .storeBatch(new FakeStoreBatch<>()) .fixedTime();
for(int i = 0; i < 100; i++) { UserInfo userInfo = new UserInfo(); userInfo.setUsername("u-"+i); stream2Batch.execute(userInfo); }
TimeUnit.SECONDS.sleep(10);
// 资源关闭stream2Batch.shutdown();
固定大小
使用场景:匹配的数量会比较多,为了避免内存压力过大,采用固定数量的方式驱动。
public static void main(String[] args) throws InterruptedException { IStream2Batch<UserInfo> stream2Batch = Stream2BatchBs.<UserInfo>newInstance() // 单个保存策略 .storeSingle(new FakeStoreSingle<>()) // 批量保存策略 .storeBatch(new FakeStoreBatch<>()) .batchSize(10) .fixedSize();
for(int i = 0; i < 100; i++) { UserInfo userInfo = new UserInfo(); userInfo.setUsername("u-"+i); stream2Batch.execute(userInfo); }
TimeUnit.SECONDS.sleep(10);
// 资源关闭 stream2Batch.shutdown();}
小结
流式处理转批量处理是一个比较场景的优化方式。
异步入库也可以大幅度提升吞吐量,在一个实时链路场景可以考虑使用。
领取专属 10元无门槛券
私享最新 技术干货