首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >谈谈线程

谈谈线程

作者头像
MiChong
发布2020-09-24 16:03:31
发布2020-09-24 16:03:31
5300
举报
文章被收录于专栏:米虫的家米虫的家
前言

关于Java 的线程问题,我们上大学的时候,计算机专业的学生肯定会遇到这两个名词–线程和进程,老师和我们说一个进程里面可以有多个线程,这里也引出了多线程的概念。

提到Java中的线程,我们就会想到Thread以及Runnable,也了解了Thread的生命周期

相信大家也写过关于线程的小程序,比如卖火车票啊,生产者消费者模式等。我们使用到了synchronized加锁技术(或者使用Lock方法),也用到了关于线程通知,等待任务的notifyAll()和wait()方法。更多的使用大家找一本大书去看吧,比如《Java核心技术》什么的,Java基础不是本文的重点。下面我们谈谈线程池和多线程的原理和使用,以及整合springboot框架。

一、线程池

在多线程任务环境中,每次开启一个任务,我们把它提交到线程池中,交给线程池来管理,由线程池来调度任务。如果每次开启任务都要创建一个线程,那么对于大量任务的环境下,服务器资源将会很快耗尽,不仅对服务是一个灾难,对于用户体验来说也是一个灾难。此时,我们来谈谈Java中线程池技术。

1、线程池种类
(1)newSingleThreadExecutor

单线程的线程池,在这个线程池中的所有任务只有一个线程去执行,如果当前的线程因为异常结束,将会有新的线程来执行,保证任务按照顺序执行下去。

java

代码语言:javascript
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
}
(2)newFixedThreadPool

创建一个固定线程数目的线程池,每次执行一个新的任务都会创建一个新的线程,直到达到我们设定的线程数。同样的,如果某个线程因为异常而结束,就会补充一个新的线程。

java

代码语言:javascript
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
}
(3)newCachedThreadPool

创建一个可缓存的线程池,如果线程池大小超过了任务所需的线程数目,线程池将会回收多余的线程,相反,任务所需线程增加,线程池也会增加线程,弹性修改线程池中的线程数目。

java

代码语言:javascript
复制
ExecutorService pool = Executors.newCachedThreadPool();
(4)newScheduledThreadPool

此线程支持周期性和定时任务。

java

代码语言:javascript
复制
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
        for (int i = 0; i < 10; i++) {
            pool.schedule(() -> {
                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
            }, 10, TimeUnit.SECONDS);//延迟10秒执行任务
        }
    }
}

当我们有定时和周期性任务需求的时候,下面是每隔1秒执行一次

java

代码语言:javascript
复制
//pool.scheduleWithFixedDelay也可以
pool.scheduleAtFixedRate(() -> {
                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 1, 1, TimeUnit.SECONDS);
(5)newWorkStealingPool

jdk8才有的,会根据所给的并行层次来动态地开启关闭线程,通过使用多个队列减少竞争,底层使用ForkJoinPool来实现的。充分利用多CPU多核,将一个任务拆分位多个小任务,放到多个处理器中执行,等这些小任务完成之后,再次合并为这个完整的任务。

源码:参数parallelism应该是并行层次吧,参考无参的那个方法,这个parallelism应该是可用处理器数目

java

代码语言:javascript
复制
/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

/**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            //Runtime.getRuntime().availableProcessors()返回的是可用处理器数目
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
使用场景
  • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。
  • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。
  • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。
  • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
  • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。
2、线程池的拒绝策略

当任务过多,而且处理器无法处理额外的任务,我们所要要做的就是拒绝服务,java中提供了RejectedExecutionHandler接口,通过实现这个接口的rejectedExecution()方法实现不同的拒绝策略。下面是源码查看一波。

(1)AbortPolicy策略

该策略直接抛出异常,阻止系统正常运行

java

代码语言:javascript
复制
public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        /**
     	 * 总是抛出异常
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
(2)CallerRunsPolicy 策略

只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

java

代码语言:javascript
复制
/**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }

        /**
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
(3)DiscardOldestPolicy策略

该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

java

代码语言:javascript
复制
/**
  * A handler for rejected tasks that discards the oldest unhandled
  * request and then retries {@code execute}, unless the executor
  * is shut down, in which case the task is discarded.
  */
 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
     public DiscardOldestPolicy() { }

     /**
      * Obtains and ignores the next task that the executor
      * would otherwise execute, if one is immediately available,
      * and then retries execution of task r, unless the executor
      * is shut down, in which case task r is instead discarded.
      *
      * @param r the runnable task requested to be executed
      * @param e the executor attempting to execute this task
      */
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         if (!e.isShutdown()) {
             //获取队列,并且将e插入到队列中,等待合适的时间去执行
             e.getQueue().poll();
             //执行任务
             e.execute(r);
         }
     }
 }
(4)DiscardPolicy 策略

这个策略默默丢弃无法处理的任务,并且不做任何处理

java

代码语言:javascript
复制
/**
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
   
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //什么也不做
    }
}
3、线程池关闭

shutdownNow:对未开始的任务全部取消执行,并且返回未执行任务的list集合。

shutdown:不接受新提交的任务,不影响已经提交的任务执行。

二、生产活动中使用
1、CountDownLatch 方式

java

代码语言:javascript
复制
public class StatsDemo {
    final static SimpleDateFormat sdf = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");
    
    final static String startTime = sdf.format(new Date());
    
    /**
     * IO密集型任务  = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
     * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
     * 混合型任务  = 视机器配置和复杂度自测而定
     */
    private static int corePoolSize = Runtime.getRuntime().availableProcessors();
    /**
     * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
     *                           TimeUnit unit,BlockingQueue<Runnable> workQueue)
     * corePoolSize用于指定核心线程数量
     * maximumPoolSize指定最大线程数
     * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
     * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
     * 监控队列长度,确保队列有界
     * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
     * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
     * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
     */
    private static ThreadPoolExecutor executor  = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000));
    
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        //使用execute方法
          executor.execute(new Stats("任务A", 1000, latch));
          executor.execute(new Stats("任务B", 1000, latch));
          executor.execute(new Stats("任务C", 1000, latch));
          executor.execute(new Stats("任务D", 1000, latch));
          executor.execute(new Stats("任务E", 1000, latch));
        latch.await();// 等待所有人任务结束
        System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
    }

    static class Stats implements Runnable  {
        String statsName;
        int runTime;
        CountDownLatch latch;

        public Stats(String statsName, int runTime, CountDownLatch latch) {
            this.statsName = statsName;
            this.runTime = runTime;
            this.latch = latch;
        }

        public void run() {
            try {
                System.out.println(statsName+ " do stats begin at "+ startTime);
                //模拟任务执行时间
                Thread.sleep(runTime);
                System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
                latch.countDown();//单次任务结束,计数器减一
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
2、Future 方式

重点是和springboot整合,采用注解bean方式生成ThreadPoolTaskExecutor

@Bean

java

代码语言:javascript
复制
//spring依赖包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {

    /**
     * 默认线程池线程池
     *
     * @return Executor
     */
    @Bean
    public ThreadPoolTaskExecutor defaultThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数目
        executor.setCorePoolSize(16);
        //指定最大线程数
        executor.setMaxPoolSize(64);
        //队列中最大的数目
        executor.setQueueCapacity(16);
        //线程名称前缀
        executor.setThreadNamePrefix("defaultThreadPool_");
        //rejection-policy:当pool已经达到max size的时候,如何处理新任务
        //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        //对拒绝task的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //线程空闲后的最大存活时间
        executor.setKeepAliveSeconds(60);
        //加载
        executor.initialize();
        return executor;
    }
}
使用

java

代码语言:javascript
复制
//通过注解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;

java

代码语言:javascript
复制
//使用Future方式执行多任务
//生成一个集合
List<Future> futures = new ArrayList<>();

//获取后台全部有效运营人员的集合
List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);

    for (AdminUserMsgResponse response : adminUserDOList) {
           //并发处理
           if (response.getMobile() != null) {
           Future<?> future = executor.submit(() -> {
           //发送短信
           mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
           });
          futures.add(future);
       }
}

//查询任务执行的结果
for (Future<?> future : futureList) {
     while (true) {
     //CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
     if (future.isDone()&& !future.isCancelled()) {
     //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
     
     break;//当前future获取结果完毕,跳出while
     } else {
       Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
    }
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、线程池
    • 1、线程池种类
      • (1)newSingleThreadExecutor
      • (2)newFixedThreadPool
      • (3)newCachedThreadPool
      • (4)newScheduledThreadPool
      • (5)newWorkStealingPool
    • 使用场景
    • 2、线程池的拒绝策略
      • (1)AbortPolicy策略
      • (2)CallerRunsPolicy 策略
      • (3)DiscardOldestPolicy策略
      • (4)DiscardPolicy 策略
    • 3、线程池关闭
  • 二、生产活动中使用
    • 1、CountDownLatch 方式
    • 2、Future 方式
      • 使用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档