前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【二十七】springboot实现多线程事务处理

【二十七】springboot实现多线程事务处理

作者头像
小z666
发布2024-06-21 17:39:32
630
发布2024-06-21 17:39:32
举报
文章被收录于专栏:javajava

在前面二十四章做了一个springboot使用EasyExcel和线程池实现多线程导入Excel数据的demo,在写时忘了做事务处理,评论区有个大佬提出来了,这章就对二十四章的代码做一个改造,完善多线程的事务处理。

代码语言:txt
复制
    对于springboot的事务处理,前面在二十三章也做过[springboot整合spring事务详解以及实战](https://blog.csdn.net/weixin_56995925/article/details/125577851)的学习,但是在多线程时,这个东西并不适用,本章就通过手写事务处理(编程式事务处理)。
代码语言:txt
复制
     由于本章是针对二十四章的批量导入功能的扩展,所有不会再写事务处理不相关的(二十四章的内容)介绍了。

一、阐述目的与实现方式

代码语言:txt
复制
    前面章节实现的多线程处理excel导入功能,如果一个子线程出现错误,结果会是那个子线程的数据处理不了,而其他子线程的数据仍然正常处理保存,并不会存在事务处理的情况,本章改造代码实现事务处理,所有线程正常执行才会保存数据,否则就回滚。大致如下:

二、手动让子线程报错

代码语言:txt
复制
    为了后面测试事务回滚,**手动让某个子线程报错**,比如名为线程3的子线程,如下:


三、改造主线程

代码语言:txt
复制
    根据上面图的思路,首先改造**主线程**的代码,整体代码如下:
代码语言:javascript
复制
package com.swagger.demo.service;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.swagger.demo.config.SpringJobBeanFactory;
import com.swagger.demo.mapper.DeadManMapper;
import com.swagger.demo.model.entity.DeadManExcelData;
import com.swagger.demo.thread.DeadManThread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author zrc
 * @version 1.0
 * @description: TODO 最新入狱名单导入监听器
 *
 * @date 2022/5/30 15:56
 */
@Service
@Slf4j
@Component
public class DeadManExcelListener extends AnalysisEventListener<DeadManExcelData> {

    /**
     * 多线程保存集合,使用线程安全集合
     */
    private List<DeadManExcelData> list = Collections.synchronizedList(new ArrayList<>());

    /**
     * 创建线程池必要参数
     */
    private static final int CORE_POOL_SIZE = 10; // 核心线程数
    private static final int MAX_POOL_SIZE = 100; // 最大线程数
    private static final int QUEUE_CAPACITY = 100; // 队列大小
    private static final Long KEEP_ALIVE_TIME = 1L; // 存活时间

    public List<DeadManExcelData> getData(){
        return list;
    }

    public DeadManExcelListener(){

    }

    public void setData(List<DeadManExcelData> deadManExcelDataList){
        this.list = deadManExcelDataList;
    }

    @Override
    public void invoke(DeadManExcelData deadManExcelData, AnalysisContext analysisContext) {
        if(deadManExcelData!=null){
            list.add(deadManExcelData);
        }
    }

    /**
     * 多线程方式保存
     * @param analysisContext
     */
    @Override
    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
        log.info("解析结束,开始插入数据");

        // 创建线程池
        ExecutorService executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 指定每个线程需要处理的导入数量,假设每个线程处理15000条,注意配合上面线程池的大小
        int singleThreadDealCount = 15000;

        // 根据假设每个线程需要处理的数量以及总数,计算需要提交到线程池的线程数量
        int threadSize=(list.size()/singleThreadDealCount)+1;

        // 计算需要导入的数据总数,用于拆分时线程需要处理数据时使用
        int rowSize = list.size() + 1;

        // 测试开始时间
        long startTime = System.currentTimeMillis();

        // 申明该线程需要处理数据的开始位置
        int startPosition = 0;

        // 申明该线程需要处理数据的结束位置
        int endPosition = 0;

        // 为了让每个线程执行完后回到当前线程,使用CountDownLatch,值为线程数,每次线程执行完就会执行countDown方法减1,为0后回到主线程,也就是当前线程执行后续的代码
        CountDownLatch count = new CountDownLatch(threadSize);

        // 用来控制主线程回到子线程
        CountDownLatch mainCount = new CountDownLatch(1);

        // 用来控制最终回到主线程
        CountDownLatch endCount = new CountDownLatch(threadSize);

        // 用来存放子线程的处理结果,若出错就保存一个false
        CopyOnWriteArrayList<Boolean> sonResult = new CopyOnWriteArrayList<Boolean>();

        // 使用线程安全的对象存储,保存主线程最后总的判断结果,是提交还是回滚
        AtomicBoolean ifSubmit = new AtomicBoolean(true);

        // 计算每个线程要处理的数据
        for(int i=0;i<threadSize;i++){
            // 如果是最后一个线程,为保证程序不发生空指针异常,特殊判断结束位置
            if((i+1)==threadSize){
                // 计算开始位置
                startPosition = (i * singleThreadDealCount);
                // 当前线程为划分的最后一个线程,则取总数据的最后为此线程的结束位置
                endPosition = rowSize-1;
            }else{
                // 计算开始位置
                startPosition = (i * singleThreadDealCount);
                // 计算结束位置
                endPosition = (i + 1) * singleThreadDealCount;
            }

            DeadManMapper deadManMapper = SpringJobBeanFactory.getBean(DeadManMapper.class);

            DeadManThread thread = new DeadManThread(count,deadManMapper,list,startPosition,endPosition
            ,sonResult,mainCount,ifSubmit,endCount);
            executor.execute(thread);
        }
        try {
            count.await();
            for (Boolean resp : sonResult) {
                if (!resp) {
                    // 只要有一个子线程出异常,就设置最终结果为回滚
                    log.info("主线程:有线程执行失败,所有线程需要回滚");
                    ifSubmit.set(false);
                    break;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 回到子线程处理回滚或者提交事务
            mainCount.countDown();
        }

        try {
            endCount.await();
            // 逻辑处理完,关闭线程池
            executor.shutdown();
            long endTime = System.currentTimeMillis();
            log.info("总耗时:"+(endTime-startTime));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
代码语言:txt
复制
    新增如下4个参数(count是前面章节的已有的)。

PS:CountDownLatch类前面有讲过,通过await和countDown方法能够方便的实现多个线程之间的来回切换。 CopyOnWriteArrayList和AtomicBoolean是为了能够线程安全的保存多个线程共同使用的数据。

接着,**重写DeadManThread线程类的构造方法**,将上面新增的四个参数通过构造方法传给子线程。然后调用记录子线程第一次的**cout的await**方法,等待子线程第一次执行完毕,回到主线程继续执行。**回到主线程**后,主线程判断子线程第一次执行完后保存的返回集,判断是否存在false(子线程若报错,保存false,否则保存true)。若存在false就将idsubmit设置为false,意思是需要回滚数据,然后调用记录主线程执行的mainCount的countDown方法,让主线程执行完毕,**回到子线程**调用mainCount.countDown的位置继续子线程执行。

当子线程根据ifSubmit判断完进行回滚还是提交事务操作后,**回到主线程**,主线程关闭线程池。

四、改造子线程

代码语言:txt
复制
    接着改造子线程,整体代码如下:
代码语言:javascript
复制
package com.swagger.demo.thread;

import com.swagger.demo.config.SpringJobBeanFactory;
import com.swagger.demo.mapper.DeadManMapper;
import com.swagger.demo.model.entity.DeadMan;
import com.swagger.demo.model.entity.DeadManExcelData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author zrc
 * @version 1.0
 * @description TODO
 * @date 2022/7/22 15:40
 */
@Component
@Slf4j
public class DeadManThread implements Runnable{

    /**
     * 当前线程需要处理的总数据中的开始位置
     */
    private int startPosition;

    /**
     * 当前线程需要处理的总数据中的结束位置
     */
    private int endPosition;

    /**
     * 需要处理的未拆分之前的全部数据
     */
    private List<DeadManExcelData> list = Collections.synchronizedList(new ArrayList<>());

    /**
     * 记录子线程第一次执行是否完成
     */
    private CountDownLatch count;

    private DeadManMapper deadManMapper;

    /**
     * 保存每个线程的执行结果
     */
    private CopyOnWriteArrayList<Boolean> sonResult;

    /**
     * 记录主线程是否执行过判断每个线程的执行结果这个操作
     */
    private CountDownLatch mainCount;

    /**
     * 记录主线程对每个线程的执行结果的判断
     */
    private AtomicBoolean ifSubmit;

    /**
     * 声明该子线程的事务管理器
     */
    private DataSourceTransactionManager dataSourceTransactionManager;

    /**
     * 声明该线程事务的状态
     */
    private TransactionStatus status;

    /**
     * 记录子线程第二次执行是否完成
     */
    private CountDownLatch endCount;

    public DeadManThread() {

    }

    public DeadManThread(CountDownLatch count, DeadManMapper deadManMapper, List<DeadManExcelData> list
            , int startPosition, int endPosition, CopyOnWriteArrayList<Boolean> sonResult,CountDownLatch mainCount
    ,AtomicBoolean ifSubmit,CountDownLatch endCount) {
        this.startPosition = startPosition;
        this.endPosition = endPosition;
        this.deadManMapper = deadManMapper;
        this.list = list;
        this.count = count;
        this.sonResult = sonResult;
        this.mainCount = mainCount;
        this.ifSubmit = ifSubmit;
        this.endCount = endCount;
    }

    @Override
    public void run() {
        try{
            dataSourceTransactionManager = SpringJobBeanFactory.getBean(DataSourceTransactionManager.class);
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            status = dataSourceTransactionManager.getTransaction(def);
            if(Thread.currentThread().getName().contains("3")){
                throw new RuntimeException("线程3出问题了");
            }
            List<DeadMan> deadManList = new ArrayList<>();
            List<DeadManExcelData> newList = list.subList(startPosition, endPosition);
            // 将EasyExcel对象和实体类对象进行一个转换
            for (DeadManExcelData deadManExcelData : newList) {
                DeadMan deadMan = new DeadMan();
                BeanUtils.copyProperties(deadManExcelData, deadMan);
                deadManList.add(deadMan);
            }
            // 批量新增
            deadManMapper.insertBatchSomeColumn(deadManList);
            sonResult.add(true);
            } catch (Exception e) {
            e.printStackTrace();
            sonResult.add(false);
            } finally {
            // 当一个线程执行完了计数要减一不然这个线程会被一直挂起
            count.countDown();
            try {
                log.info(Thread.currentThread().getName() + ":准备就绪,等待其他线程结果,判断是否事务提交");
                mainCount.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (ifSubmit.get()) {
                dataSourceTransactionManager.commit(status);
                log.info(Thread.currentThread().getName() + ":事务提交");
            } else {
                dataSourceTransactionManager.rollback(status);
                log.info(Thread.currentThread().getName() + ":事务回滚");
            }
            // 执行完所有逻辑,等待主线程执行
            endCount.countDown();
        }
    }
}

先改造构造方法,用于接受主线程传过来的参数,并设置到自己的内部私有。

然后改造run方法。

先**开启自己的事务**,并保存事务状态,用于后面执行提交或者回滚操作。数据处理完后,若正常结束将线程安全的返回值集合变量保存一个true,否则保存false,并执行记录第一线程执行的count的countDown方法,等待所有子线程执行完后,**返回主线程**执行刚才上面讲的判断sonResult的代码,等主线程执行完判断并设置ifSubmit的值后,**回到子线程**执行main.await之后的代码。

如果ifSubmit是false就回滚,否则就提交,执行完后执行记录子线程第二次执行的endCount的countDown方法,等待子线程全部执行完后,**回到主线程**,主线程执行关闭线程池的逻辑,**结束**。

五、测试

代码语言:txt
复制
    代码写完,测试一下。测试之前数据:

代码语言:txt
复制
    现在是线程3会报错,调用接口测试。

代码语言:txt
复制
    事务成功回滚,数据没有提交。若删除手动抛异常的代码,让程序正常执行,如下:

数据提交成功,事务正常处理。

补充: 关于可能出现的死锁问题(子线程数大于10时发生),可能是springboot默认的datasource: hikari: maximum-pool-size 这个属性导致的,maximum-pool-size:最大连接数,小于等于0会被重置为默认值10。 hikari是springboot默认使用的数据源连接池。 如果子线程是大于10个,但是最大连接数只有10,就会导致后面的子线程连不上数据库,10个连接上数据库的线程也无法释放,导致出现死锁。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-03-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、阐述目的与实现方式
  • 二、手动让子线程报错
  • 三、改造主线程
  • 四、改造子线程
  • 五、测试
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档