针对6.18,11.11这种场景,平台一次性发布500w张优惠券,或者对于锁单用户统一发下100w张确认信息,同时我们平时有抢购茅台的场景,京东一次性发布10w个验证码,主要是针对高并发多线程大数据批处理任务的场景,一般用于二维码、优惠券、邮件、短信等场景。
java.util.concurrent.ThreadPoolExecutor
是 Java 中的一个线程池执行器,它允许你管理一组工作线程来执行异步任务。线程池是并发编程中的一个重要概念,它可以有效地管理线程资源,避免频繁创建和销毁线程所带来的开销。
具体参数详解,我之前有文章写到过:
Spring框架自带的线程池,注意和JUC里面原生的做对比
Spring:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
ThreadPoolTaskExecutor是 Spring 提供的一个方便的线程池实现,用于异步执行任务或处理并发请求。在使用 ThreadPoolTaskExecutor作为 Spring Bean 注册到容器中后,Spring 会负责在应用程序关闭时自动关闭所有注册的线程池,所以不需要手动关闭。
这样不仅可以确保线程池中的线程正确地停止,还可以防止资源泄露和潜在的并发问题。
美团技术上有一篇还写的挺好的,大家也可参考学习一下:[Java线程池实现原理及其在美团业务中的实践](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)
1、编写config配置文件类
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolProperties {
/**
* 核心线程池大小
*/
private int corePoolSize;
/**
* 最大可创建的线程数
*/
private int maxPoolSize;
/**
* 队列最大长度
*/
private int queueCapacity;
/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds;
}
2、读取配置返回线程池
@Configuration
public class ThreadPoolConfig
{
/*
@Value("${thread.pool.corePoolSize}")
private String corePoolSize;
@Value("${thread.pool.maxPoolSize}")
private String maxPoolSize;
@Value("${thread.pool.queueCapacity}")
private String queueCapacity;
@Value("${thread.pool.keepAliveSeconds}")
private String keepAliveSeconds;
*/
//线程池配置
@Resource
private ThreadPoolProperties threadPoolProperties;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 核心线程池大小
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
// 最大可创建的线程数
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
// 等待队列最大长度
threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
// 线程池维护线程所允许的空闲时间
threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
//异步方法内部线程名称
threadPool.setThreadNamePrefix("spring默认线程池-");
// 线程池对拒绝任务(无线程可用)的处理策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 任务都完成再关闭线程池
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 任务初始化
threadPool.initialize();
return threadPool;
}
}
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2
注意配置文件的前缀@ConfigurationProperties(prefix ="thread.pool")
SpringBootApplication
//@EnableDiscoveryClient
@MapperScan("com.atguigu.interview2.mapper") //import tk.mybatis.spring.annotation.MapperScan;
public class Interview2Application
{
@Resource
private ThreadPoolTaskExecutor threadPool;
@PostConstruct
public void getThreadPoolConfig()
{
System.out.println("*******测试threadPool getCorePoolSize: "+threadPool.getCorePoolSize());
System.out.println("*******测试threadPool getMaxPoolSize: "+threadPool.getMaxPoolSize());
System.out.println("*******测试threadPool getQueueCapacity: "+threadPool.getQueueCapacity());
System.out.println("*******测试threadPool getKeepAliveSeconds: "+threadPool.getKeepAliveSeconds());
}
public static void main(String[] args)
{
SpringApplication.run(Interview2Application.class, args);
}
}
这时候可以启动 项目,看配置文件是否读取成功
这里主要是针对优惠券的业务进行编写
1、优惠券接口CouponService
public interface CouponService
{
public void batchTaskAction();
}
2、接口实现类CouponServiceImpl
@Service
public class CouponServiceImpl implements CouponService
{
//下发优惠卷数量
public static final Integer COUPON_NUMBER = 50;
@Resource
private ThreadPoolTaskExecutor threadPool;
/**
* 下发50条优惠卷
*/
@Override
public void batchTaskAction()
{
//1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
List<String> coupons = new ArrayList<>(COUPON_NUMBER);
for (int i = 1; i <= COUPON_NUMBER; i++)
{
coupons.add("优惠卷--"+i);
}
//2 创建CountDownLatch,构造器参数为任务数量
CountDownLatch countDownLatch = new CountDownLatch(coupons.size());
long startTime = System.currentTimeMillis();
try
{
//3 将优惠卷集合逐条发送进线程池高并发处理
for (String coupon : coupons)
{
threadPool.execute(() -> {
try
{
//4 交个线程池处理的下发业务逻辑,可以提出成一个方法
System.out.println(String.format("【%s】发送成功", coupon));
}finally {
//5 发送一个少一个任务,计数减少一个
countDownLatch.countDown();
}
});
}
//6 阻塞当前发送完毕后,方法才能继续向下走
countDownLatch.await();
}catch (Exception e){
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("----任务处理完毕costTime: "+(endTime - startTime) +" 毫秒");
}
}
3、访问业务接口
@RestController
@Slf4j
public class CouponController
{
@Resource
private CouponService couponService;
//http://localhost:24618/coupon/sendv1
@GetMapping(value = "/coupon/sendv1")
public void sendv1()
{
couponService.batchTaskAction();
}
}
4、结果
5、总结
本次其实就针对优惠券进行简单的打印,速度相比较来说还是挺快的,实际运用场景中会比这复杂的多,性能肯定也会有所下降的。
前面我们已经提到了其他的场景,比如二维码、优惠券、短信、邮件、理财产品收益等场景,那我们怎么才会坐到通用呢?能否做到通用的设计或工具类,给团队赋能,一开始我们肯定是一次性编写或思考不到,考虑不周的,需要我们先针对某个场景进行编写之后,后续再做到更完美!!!
TaskBatchSendUtils
public class TaskBatchSendUtils
{
public static <T> void send(List<T> taskList, Executor threadPool, Consumer<? super T> consumer) throws InterruptedException
{
if (taskList == null || taskList.size() == 0)
{
return;
}
if(Objects.isNull(consumer))
{
return;
}
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
//遍历消息、邮件等列表
for (T couponOrShortMsg : taskList)
{
threadPool.execute(() ->
{
try
{
consumer.accept(couponOrShortMsg);
} finally {
//数量减一
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
public static void disposeTask(String task)
{
System.out.println(String.format("【%s】disposeTask下发优惠卷或短信成功", task));
}
public static void disposeTaskV2(String task)
{
System.out.println(String.format("【%s】disposeTask下发邮件成功", task));
}
}
1、业务接口类
public interface CouponServiceV2
{
public void batchTaskActionV2();
}
2、业务接口实现类
@Service
public class CouponServiceImplV2 implements CouponServiceV2
{
//下发优惠卷数量
public static final Integer COUPON_NUMBER = 50;
@Resource
private ThreadPoolTaskExecutor threadPool;
@SneakyThrows
@Override
public void batchTaskActionV2()
{
//1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
List<String> coupons = getCoupons();
long startTime = System.currentTimeMillis();
//2 调用工具类批处理任务,这些优惠卷coupons,放入线程池threadPool,做什么业务disposeTask下发
TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTask);
//TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTaskV2);
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
}
private static List<String> getCoupons()
{
List<String> coupons = new ArrayList<>(COUPON_NUMBER);
for (int i = 1; i <= COUPON_NUMBER; i++)
{
coupons.add("优惠卷--"+i);
}
return coupons;
}
}
3、业务访问接口
@RestController
@Slf4j
public class CouponController
{
@Resource
private CouponService couponService;
//http://localhost:24618/coupon/sendv1
@GetMapping(value = "/coupon/sendv1")
public void sendv1()
{
couponService.batchTaskAction();
}
@Resource
private CouponServiceV2 couponServiceV2;
//http://localhost:24618/coupon/sendv2
@GetMapping(value = "/coupon/sendv2")
public void sendv2()
{
couponServiceV2.batchTaskActionV2();
}
}