前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >多线程是剂猛药,别乱吃!

多线程是剂猛药,别乱吃!

原创
作者头像
用户11512967
修改2025-02-19 11:59:54
修改2025-02-19 11:59:54
8500
代码可运行
举报
运行总次数:0
代码可运行

一、案例场景

关于消息任务处理有多种方式:MQ、Redis、数据库等等,都是消息任务处理的首选良方。本案例采用了比较传统、老套的处理消息任务的方式:数据库。这种方式执行步骤相对简单:

(1)向数据库中插入一条数据并记录一个状态;

(2)读取这个状态的数据;

(3)根据数据处理消息任务;

(4)更改数据状态。

就这样分4步操作就解决了:数据库消息任务处理。

这种技术架构从代码层面还是很容易实现,开发量也不大,在并发量不是很高的场景下足以应对业务需求。当并发量高,一些问题就能暴漏出来。当然,任何一种技术架构设计都有它的优缺点,都不是完美的,当前这种技术架构在高并发的场景下我们看看有哪些方面会影响性能。

(1)任务消息入库,Insert受限于数据库性能、微服务应用性能的影响;

(2)定时任务_Job读取任务阻塞,大量任务消息积压在数据库表中;

(3)Application应用处理消息慢,不能及时响应并更改数据状态;

(4)与云端交互缓慢,影响Application应用消息数据处理;

我们对这4个影响性能的位置分析发现,1、3、4是不容易看出问题的,分析日志也只能看到异常,很难确定引起异常的原因。在第2步,是能看出问题的所在:当大量的消息任务数据被读取出,一次Job任务却处理不完,Job任务就会发生阻塞,第二次Job任务就会等待,直到第三次、第四次等等,数据库消息数据状态得不到及时更新,以至于大量消息任务被积压。由于数据库(比如:MySQL)的库表承载能力是巨大的,也就不会出现报警、异常等问题,大量数据积压充斥到业务系统中,这时业务系统如果没有阈值报警能力,当人为发现时势必已经积压很多业务数据,如果是订单业务逻辑,就有大量的订单状态得不到更新,更甚者:如果是购物车、秒杀、零点抢购则导致的线上事故是不言而喻的。

示例代码:

代码语言:javascript
代码运行次数:0
复制
// 定时任务Job执行
@Scheduled(cron = "0/35 * * * * ?")      
public void workJobExecute() {
    logger.info("work拉取订单定时任务----开始----{}", DateUtil.format(new Date(), DateUtil.YYYY_MM_DD_HH_MM_SS_SSS));
    if (TaskState.tmcState == 0) {
        TaskState.tmcState = 1;
        try {
            dealPushTmc();
        } finally {
            TaskState.tmcState = 0;
            logger.info("work拉取订单定时任务----结束----执行时间{}", System.currentTimeMillisEnd() - System.currentTimeMillisBegin());
        }
    } else {
        logger.info("-----work拉取订单定时任务------上次定时任务未执行完成,本次不执行操作!---------");
    }
}

// 消息定时任务处理
public void dealPushOrder() {
    // 查询参数
    ThirdPushMessage thirdPushMessage = new ThirdPushMessage();
    thirdPushMessage.setIsDeal(0);
    thirdPushMessage.setRetryTimes(3);             
    thirdPushMessage.setSender(4);
    thirdPushMessage.setCreateTime(createTime);
    thirdPushMessage.setUsePage(1);
    thirdPushMessage.setStart(0);
    thirdPushMessage.setPageSize(100);

    // 查询Job中符合条件的消息任务
    List<ThirdPushMessageVo> tpms = thirdPushMessageServiceBiz.getThirdpushmessegeList(thirdPushMessage).getList();

    int failed = 0, successed = 0, lockNum = 0, total = tpms.size();
    for (ThirdPushMessageVo tpm : tpms) {
        long startTime = System.nanoTime();
        logger.info("处理的第三方推送信息为:{}", JSON.toJSONString(tpm));
        ThirdPushMessage thirdPushMessageSave = new ThirdPushMessage();
        thirdPushMessageSave.setId(tpm.getId());
        int deal = 0;
        try {
            // 订单拉取
            Long lock = null;
            try {
                lock = tmcLockService.getLock();
                if (null != lock) {
                    // 调去云端微服务数据(跨越到网络层,性能可控率降低)
                    deal = thirdPushMessageServiceBiz.tmcOrderPush(tpm);
                } else {
                    logger.info("该单正在执行中:{}", tpm.getId());
                    lockNum++;
                    continue;
                }
            } catch (Exception e) {
                LogUtil.encError(logger, "拉取订单失败", e);
                throw e;
            } finally {
                tmcLockService.unlock(lock);
            }
            successed++;
            thirdPushMessageSave.setIsDeal(deal);
            logger.info("执行成功-{}", tpm.getId());
        } catch (Exception e) {
            // 订单拉取失败处理
            LogUtil.encError(logger, "处理定时任务出错", e);
            failed++;
            thirdPushMessageSave.setRetryTimes(tpm.getRetryTimes() + 1);
            String sb = noteException(e);
            thirdPushMessageSave.setNote(sb.toString());
        } finally {
            // 数据库消息任务状态更新
            long endTime = System.nanoTime();
            logger.info("任务【{}】的执行时间:{}", tpm.getId(), endTime - startTime);
            thirdPushMessageSave.setUpdateTime(new Date());
            thirdPushMessageServiceBiz.updateThirdpushmessege(thirdPushMessageSave);
        }
    }
}

二、多线程处理

提升一个技术架构的性能是一个综合的处理过程,仅去处理某一点或者解决掉一个慢SQL对综合性能是有限的。通过对上面技术架构及代码的分析,该性能处理的关键点是:解决数据库消息任务数据的积压,对于云端微服务不能及时响应的情况做规避处理。如何快速处理掉消息任务积压,首选就是多线程,多个消息任务被快速处理掉,不能快速响应的任务超时后暂做舍弃,等待下一次定时任务Job的处理。这样首先会解决掉消息任务积压,其次对任务漏执行也做了查缺补漏操作。

示例代码:

代码语言:javascript
代码运行次数:0
复制
################################################################################
## 加入开关对原业务逻辑做保护
//redisTemplate.boundValueOps(TMC_ORDER_PUSH_REDIS_SWITCH_KEY).set(TMC_ORDER_PUSH_REDIS_SWITCH_VALUE);
if (getTmcOrderPushRedisSwitchValue()) {
    // 加入多线程执行逻辑
    orderPullComplesionHandler.tmcOrderPullComplesionInvokeNew(tpms);
    return;
}
################################################################################

################################################################################
## OrderPullComplesionHandler.class 做线程池任务触发
## OrderPullComplesionHandler.class#tmcOrderPullComplesionInvokeNew()
/**
 * 订单拉取异步执行多线程任务方法(新)
 * (1) 订单拉取消息 tpms 去重,方式同一个订单多次拉取
 * (2) 去重后 tpms 数据提交到线程任务池
 * (3) 获取执行任务,不断通过提交的任务数量拉取,注意:那个任务先执行完哪个任务先返回,不以循环标识为准
 * (4) 处理拉取结束结果,包括成功、失败(处理拉取次数和重试次数)
 * (5) 处理拉取超时任务,仅处理拉取次数
 * @param tpms
 */
public void tmcOrderPullComplesionInvokeNew(List<ThirdPushMessageVo> tpms) {
    // tpm去重(防止多消息同时被拉去,导致订单重复)
    Map<String,ThirdPushMessageVo> mapDisDuplication = new HashMap<>();
    Map<Integer,ThirdPushMessageVo> mapDisDupNotPushOrder = new HashMap<>();
    mapDisDuplication = opMapDisDuplication(tpms,mapDisDupNotPushOrder);
    if (mapDisDuplication == null || mapDisDuplication.isEmpty()) {
        return ;
    }
    log.info("dealPushTmc#tmcOrderPullComplesionInvokeA1:{}:{}",tpms.size(),mapDisDuplication.size());
    log.info("dealPushTmc#tmcOrderPullComplesionInvokeA2:{}:{}",tpms.size(),mapDisDupNotPushOrder.size());

    // 根据ThirdPushMessage拉取任务提交
    CompletionService<ThirdPushMessageVo> csTmcOrderPull = new ExecutorCompletionService<>(orderPullThreadPool);
    // 任务执行返回值(注意:返回任务与临时变量顺序不一致,返回任务根据执行完成情况做返回)
    Map<Integer, Future<ThirdPushMessageVo>> futuresMapResult = new HashMap<>(mapDisDuplication.size());
    // 临时变量存储(注意:返回任务与临时变量顺序不一致,返回任务根据执行完成情况做返回)
    Map<Integer,ThirdPushMessageVo> mapMessageInitTempage = new HashMap<>();
    Map<Integer,ThirdPushMessageVo> mapMessageSuccessOrFailureTempage = new HashMap<>();
    Map<Integer,ThirdPushMessageVo> mapMessageTimeoutTempage = new HashMap<>();

    // 拉取任务分配
    for (Map.Entry<String,ThirdPushMessageVo> pushMessageVoEntry : mapDisDuplication.entrySet()) {
        // 多线程异步任务(拉取和标记拉过)
        ThirdPushMessageVo pushMessageVo = pushMessageVoEntry.getValue();
        mapMessageInitTempage.put(pushMessageVo.getId(),pushMessageVoEntry.getValue());
        // 提交任务
        futuresMapResult.put(pushMessageVo.getId(), csTmcOrderPull.submit(new TmcOrderTask(pushMessageVo, thirdPushMessageServiceBiz)));
    }

    // 拉取任务结果处理
    for (Map.Entry<Integer,Future<ThirdPushMessageVo>> entry : futuresMapResult.entrySet()) {
        try {
            // 任务执行超时
            Future<ThirdPushMessageVo> futureResult = csTmcOrderPull.poll(ORDER_PULL_THREAD_TIMEOUT_VALUE, TimeUnit.SECONDS);       // 任务6s后标记超时
            if (Objects.isNull(futureResult)) {
                continue;
            }

            // 获取任务处理结果
            ThirdPushMessageVo thirdPushMessageVoResult = futureResult.get();
            mapMessageSuccessOrFailureTempage.put(thirdPushMessageVoResult.getId(),thirdPushMessageVoResult);
        } catch (Exception e) {
             log.error("tmcOrderPullComplesionInvoke订单拉取任务异常:{},{}",e.getMessage(),e);
        }
    }

    // 执行数据库状态更新
    for (Map.Entry<Integer,ThirdPushMessageVo> mapInitEntry : mapMessageInitTempage.entrySet()) {
        // 拉取超时
        if (!mapMessageSuccessOrFailureTempage.containsKey(mapInitEntry.getKey())) {
            mapMessageTimeoutTempage.put(mapInitEntry.getKey(),mapInitEntry.getValue());
            continue;
        }
        // 拉取成功、失败处理
        ThirdPushMessageVo thirdPushMessageVoResult = mapMessageSuccessOrFailureTempage.get(mapInitEntry.getKey());
        if (thirdPushMessageVoResult.getIsDeal() != 1) {    // 拉取失败
            thirdPushMessageVoResult.setIsDeal(0);
            thirdPushMessageVoResult.setRetryTimes(thirdPushMessageVoResult.getRetryTimes() + 1);
        }
        finallyThridPushMessageModified(thirdPushMessageVoResult);
        log.info("tmcOrderPullComplesionInvoke订单最终拉取完成:orderId:{},isDeal:{},retryTimes:{},mesageId:{},message:{}"
                        ,getCtripOrderStatusDTO(thirdPushMessageVoResult).getOrderId()
                        ,thirdPushMessageVoResult.getIsDeal()
                        ,thirdPushMessageVoResult.getRetryTimes()
                        ,thirdPushMessageVoResult.getId()
                        ,JSON.toJSONString(thirdPushMessageVoResult));
    }

    // 拉取超时处理
    mapMessageTimeoutUpdateStatus(mapMessageTimeoutTempage);

    // 去重后的消息不需要多次拉取
    mapDisDupNotPushOrderUpdateStatus(mapDisDupNotPushOrder);
}
################################################################################

################################################################################
## TmcOrderTask.class 订单处理任务
public class TmcOrderTask implements Callable<ThirdPushMessageVo> {

    private ThirdPushMessageVo thirdPushMessageVo;                     // 请求参数(RPC)参数
    private ThirdPushMessageServiceBiz thirdPushMessageServiceBiz;     // RPC 服务 Bean

    @Override
    public ThirdPushMessageVo call() {
        thirdPushMessageVo.setIsDeal(tmcOrderPushRpcToCtrip(thirdPushMessageVo));
        return thirdPushMessageVo;
    }

    /**
     * TMC 订单拉取task任务执行
     * @param thirdPushMessageVo
     * @return
     */
    private int tmcOrderPushRpcToCtrip(ThirdPushMessageVo thirdPushMessageVo) {
        int result;
        CtripOrderStatusDTO statusDTO = JSON.parseObject(thirdPushMessageVo.getMessage(), CtripOrderStatusDTO.class);

        try {
            if (thirdPushMessageServiceBiz.tmcOrderPush(thirdPushMessageVo) > 0) {
                result = 1;
                printLogInfo("tmcOrderPushRpcToCtrip_拉取成功线程执行",statusDTO,thirdPushMessageVo);
            } else {
                result = -1;
                printLogInfo("tmcOrderPushRpcToCtrip_拉取失败线程执行",statusDTO,thirdPushMessageVo);
            }
        } catch (Exception e) {
            result = -1;
            printLogError("tmcOrderPushRpcToCtrip_拉取异常执行",statusDTO,thirdPushMessageVo);
        }
        return result;
    }
}
################################################################################

三、多线程辅助处理

上文中对多线程解决消息任务处理引发的阻塞问题做了阐述,包括:架构图以及示例代码。其中,示例代码中除了线程池、线程池任务,还加入了很多辅助处理:重复订单的排他、异常线程任务收集、异常线程任务的处理等。为什么会有这么多的辅助操作呢,仔细观察下这些辅助操作,其实都是为了更好的保护多线程的执行而做出的兼容调整。

(1)重复订单数据处理

当任务数据进入线程池通过多线程任务执行时,有一种场景是非常危险的,我们拿订单来举例:

订单号:TA00001001,从下单到结束中间有多个状态,如:A、B、C、D,如果这时发出订单动作的订单消息会出现多次,但系统肯定希望能得到最新一次的订单状态。这样的操作放到串行化代码中是很正常的,如果放到并发处理中就会出现特殊场景,这种场景产生步骤是: (1)数据库中没有任何该订单数据; (2)A状态消息被线程1拿到,B状态消息被线程2拿到; (3)两个线程同时被执行,都拿到了最新订单状态B,两个线程同时执行了数据库save操作; (4)数据库中就存在了2条订单号:TA00001001的数据,而且状态相同。

这就是多线程执行引起的并发问题。有同学可能会说:比如是MySQL数据库,在表中用订单号做个唯一主键就解决掉了,文者推荐这篇文章:《https://time.geekbang.org/column/article/70848》详细阐述了关于普通主键和唯一主键的区别和使用场景。

订单号:TA00001001,从下单到结束中间有多个状态,如:A、B、C、D,如果这时发出订单动作的订单消息会出现多次,但系统肯定希望能得到最新一次的订单状态。这样的操作放到串行化代码中是很正常的,如果放到并发处理中就会出现特殊场景,这种场景产生步骤是: (1)数据库中没有任何该订单数据; (2)A状态消息被线程1拿到,B状态消息被线程2拿到; (3)两个线程同时被执行,都拿到了最新订单状态B,两个线程同时执行了数据库save操作; (4)数据库中就存在了2条订单号:TA00001001的数据,而且状态相同。 这就是多线程执行引起的并发问题。有同学可能会说:比如是MySQL数据库,在表中用订单号做个唯一主键就解决掉了,文者推荐这篇文章:《https://time.geekbang.org/column/article/70848》详细阐述了关于普通主键和唯一主键的区别和使用场景。

这时候可以做个简单处理,当全部消息任务数据准备进入线程池时,可以先对消息任务数据通过订单号做重复筛选处理,保证唯一一个订单号进入到线程池中,这样就避免了多线程并发情况。

(2)多线程任务的简单性

通过上文中的示例代码能看到,Callable的多线程任务操作很单一,仅仅就是一个RPC微服务的执行动作,没有额外的:参数准备、辅助功能处理、数据库信息处理等等。因为,在多线程任务中,每多一个功能点,线程隔离范围就会放大,原子性、可见性的保证就会变的更加复杂,变得会更加的不可控,甚至每个线程的执行时长会放大,当线程并发量足够大时,影响业务系统性能可能是指数级的。尽量保证多线程任务的简单性,通俗的讲就是:锁-该锁的部分,不需要锁的就不用-锁。

代码语言:javascript
代码运行次数:0
复制
## 类似Netty源码中对输出流操作的锁范围
## PcapWriteHandler.class#writePacket(ByteBuf,ByteBuf)
void writePacket(ByteBuf packetHeaderBuf, ByteBuf packet) throws IOException {
    ......
    if (pcapWriteHandler.sharedOutputStream()) {
        // 当输出流是共享的时候加锁,防止并发问题
        synchronized (outputStream) {  
            packetHeaderBuf.readBytes(outputStream, packetHeaderBuf.readableBytes());
            packet.readBytes(outputStream, packet.readableBytes());
        }
    } else {
        // 非共享输出流时,输出即可
        packetHeaderBuf.readBytes(outputStream, packetHeaderBuf.readableBytes());
        packet.readBytes(outputStream, packet.readableBytes());
    }
    ......
}

(3)微服务对多线程的支持

在上文中的架构图中还有一个性能的瓶颈点,如图:

瓶颈点就是:多线程任务与RPC微服务的交互。这个瓶颈点是要保证线程池中线程数量与RPC微服务数量保持一个平衡点,根据每个RPC微服务的执行时间来寻求线程池线程数量的均衡,理想情况(线程数量 : RPC微服务数量)是:>1:1,如果RPC微服务执行时间在200MS,也可以做:>2:1的比例,如果时间更短可以根据这个比例继续寻找平衡。当然,线上环境是不允许做大范围的实验和调试的,需要通过监控平台来完成这项比例的调整。最理想、合理的还是:>1:1。

(4)任务消息安全落库处理

在(2)中已经提到多线程的任务尽量简单化,不要掺杂太多的额外操作,比如:数据库信息处理。在多线程任务中如果存在异常处理情况尽量以标识符返回值的方式处理。在获取到多线程任务的返回值后,将消息任务信息保存到临时的缓存中,通过这个缓存再做相应的逻辑处理,其中包括:数据库数据状态处理。这也是业务代码的逻辑解耦。

代码语言:javascript
代码运行次数:0
复制
// 获取任务处理结果
ThirdPushMessageVo thirdPushMessageVoResult = futureResult.get();
mapMessageSuccessOrFailureTempage.put(thirdPushMessageVoResult.getId(),thirdPushMessageVoResult);

// 当线程池本次任务执行全部完成后,在遍历 mapMessageSuccessOrFailureTempage,做数据库信息
// 处理或其他异常打印等。

四、案例总结

很多人在使用多线程时候会根据网上文章或其他示例将多线程代码Copy下来,把自己的业务逻辑像填空一样补充进去,这样就是一个完美的多线程使用方法吗?曾经也听朋友谈起对多线程的使用:多线程的使用高手并不是把多线程的代码用的多么熟练,而是能通过非多线程的方式来解决多线程并发的问题。就是讲:每个方法、每个功能都不是银弹,能在综合业务系统中发挥最大的潜能才是王道。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、案例场景
  • 二、多线程处理
  • 三、多线程辅助处理
    • (1)重复订单数据处理
    • (2)多线程任务的简单性
    • (3)微服务对多线程的支持
    • (4)任务消息安全落库处理
  • 四、案例总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档