前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【多线程】阻塞队列,线程池,定时器

【多线程】阻塞队列,线程池,定时器

作者头像
用户11369558
发布2025-02-12 09:56:03
发布2025-02-12 09:56:03
10400
代码可运行
举报
文章被收录于专栏:JavaJava
运行总次数:0
代码可运行

阻塞队列

阻塞队列是什么?

阻塞队列(BlockingQueue)是一种特殊的队列,它也是遵循“先进先出”的原则;

其中,它有俩个重要的特性:

  • 当队列满的时候,继续入队列会阻塞,直到其他线程从队列中取走元素
  • 当队列空的时候,继续出队列会阻塞,直到其他线程从队列中插入元素

阻塞队列一个典型的应用场景就是“生产者消费者模型”,这是一种典型的开发模型;

生产者消费者模型

生产者和消费者通过一个容器(这个容器可以是很多种数据结构)可以解决消费者和生产者之间的强耦合关系;

生产者和消费者不需要直接联系,而是通过一个阻塞队列在他们之间进行联系;

这样生产者生产完数据就可以不用发送给消费者,而是丢进容器里面,等待消费者进行处理;

而消费者也不用找生产者要数据,直接通过拿阻塞队列里面的数据即可;

好处:

1.可以进行“削峰填谷”

将阻塞队列充当一个缓存区,去平衡生产者和消费者的处理能力; 假设在一个大型的活动秒杀现场,服务器同一时候接入到大量的请求,如果直接处理这些请求的话,那么服务器可能会抵挡不住,导致宕机;这个时候将这些请求加入到阻塞队列中,可以让消费者慢慢的去处理阻塞队列中的请求,以防服务器被这一波大的请求直接冲垮~

2.进行解耦合

在以下场景: 过年一个人进行包饺子,一般会有明确的分工,比如一个人负责擀饺子皮(生产者),其他人负责包饺子(消费者); 那么擀饺子皮的人(生产者)不需要关心包饺子的人(消费者)是谁,能包就行,包饺子的人也不需要知道擀饺子皮的人是谁,只要有饺子皮就行~ 那么它们之间相互就不存在耦合


Java标准库的阻塞队列: 

可以看出,BlockingQueue是一个接口,实现它的类有3个。

其中put方法和take方式是带有阻塞行为的入队列和出队列,而之前像offer方法或poll方法是不带阻塞行为的方法~

代码语言:javascript
代码运行次数:0
复制
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Demo1 {
    public static void main(String[] args) {
        BlockingQueue<Integer> bq = new ArrayBlockingQueue<>(100);
        Thread produce = new Thread(() -> {
            int count = 1;
            while (true) {
                try {
                    bq.put(count);
                    System.out.println("produce put: " + count);
                    count++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        });

        Thread consume = new Thread(()->{
            while (true) {
                try {
                    int num =  bq.take();
                    System.out.println("consume take: " + num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        });
        produce.start();
        consume.start();
    }
}

实现一个简单阻塞队列: 

1.通过一个队列

2.保证线程安全进行线程加锁

3.引入阻塞功能

代码语言:javascript
代码运行次数:0
复制
class MyBlockingQueue {
    private String[] elems = null;
    private volatile int head = 0; //指向队首
    private volatile int tail = 0; //指向队尾
    private volatile int count = 0;//当前队列中元素的个数
    public MyBlockingQueue(int capacity) {
        elems = new String[capacity];
    }
    public void put(String elem) throws InterruptedException {
        synchronized (this) {
            if(count >= elem.length()) {
                //队列已满,等待
                this.wait();
            }
            elems[tail] = elem;
            tail++;
            if(tail == elems.length) {
                tail = 0; // 循环队列
            }
            //更新count;
            count++;
            this.notify();
        }
    }
    public String take() throws InterruptedException {
        String elem = null;
        synchronized(this) {
            if(count == 0) {
                //队列为空,等待
                this.wait();
            }
             elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            count--;
            this.notify();
            return elem;
        }
    }
}


public class Demo2 {
    public static void main(String[] args) {
        MyBlockingQueue bq = new MyBlockingQueue(100);
        Thread produce = new Thread(() -> {
            int count = 1;
            while (true) {
                try {
                    bq.put(count+"");
                    System.out.println("produce " + count);
                    count++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        Thread consume = new Thread(() -> {
            while (true) {
                try {
                    String num = bq.take();
                    System.out.println("consume " + num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        produce.start();
        consume.start();
    }

}

线程池 

线程池是什么?

频繁创建线程的效率是低下的,如果直接在一个池子中创建好线程,那么想用的时候直接去池子里面取即可~

线程池最大的好处就是可以减小每次启动,销毁线程的损耗,提高了资源的利用率~

Java标准款中的线程池:

1.corePoolSize & maximumPoolSize

核心线程数和最大线程数 可以理解成corePoolSize就是正式工 maximumPoolSize 就是  corePoolSize + 非核心线程数 非核心线程的作用就是临时工,当任务量很大的情况下,才会启动临时工帮忙完成工作,当没啥任务时,就不会启动非核心线程

2. keepAliveTime & unit

存活时间和存活时间的单位 非核心线程允许最大存活的时候  unit 单位可以是秒,分,日,周.... 相当于临时工如果没了任务,再将他停留一段时间,万一这期间任务又上来了,可以重新召回他

3.workQueue 

任务队列 存放任务的阻塞队列,后续大家都在忙自己的事,此时还有任务的话先存放在这个队列中,等后续忙完再进行处理队列里面的任务

4.threadFactor

创建线程的工厂, 参与具体的创建线程工作. 通过不同线程工厂创建出的线程相当于 对一些属性进行了不同的初始化设置.

5.handler

拒绝策略 当任务超出线程池能够承受的范围时候,就会有以下几种拒绝策略:

  • AbortPolicy():超过负荷,直接抛出异常.
  • CallerRunsPolicy():调用者负责处理多出来的任务.
  • DiscardOldestPolicy():丢弃队列中最老的任务.
  • DiscardPolicy():丢弃新来的任务

线程池在生活中应用到的地方很多,假设以下场景 银行办理业务:

3个核心线程数,最大线程数为5 任务队列大小为3

代码语言:javascript
代码运行次数:0
复制
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
                (3, 5, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
                        Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

此时如果只有6个任务的话

那么将刚刚好,不需要通过临时工的加入也能完成任务

但这是又新增了一个任务 7个任务

此时临时工也加入了工作

但如果一下子任务太多的话

那么将根据拒绝策略做出相应的选择~

由于上述使用起来比较复杂,参数很多,可以直接使用它提供的工厂类

其中 newFixedThreadPool(int nTread):创建固定线程数的线程池 newCachedThreadPool():创建线程数目动态变化的线程池.(随任务数量变化) newSingleThreadExectuor():创建单线程的线程池. newScheduleThreadPool():设定延迟时间后执行指令,或者定期执行指令,是进阶版的Timer.

代码语言:javascript
代码运行次数:0
复制
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            int id = i;
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "办理业务" + id);
                }
            });
        }
        pool.shutdown();
    }

 实现一个简单的线程池:

  • 核心操作为submit,将任务添加到线程池的队列中
  • 使用Worker类描述一个工作线程.使用Runnable描述一个任务
  • 使用BlockingQueue组织所有的任务
  • 每个Worker线程需要做的事情就是不断从BlockingQueue中获取任务并执行
  • 指定线程池的最大线程数当当前线程超过这个最大值时,就不能新增线程了

定时器 

定时器是什么?

定时器类似闹钟效果~

指定一个任务(running),并且指定一个时间为5s,那么它将会在5s之后执行~;

第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行

实现一个简单的定时器

代码语言:javascript
代码运行次数:0
复制
import java.util.PriorityQueue;

//MyTimerTask 类⽤于描述⼀个任务(作为 MyTimer 的内部类). ⾥⾯包含⼀个 Runnable 对象和⼀个 time
class MyTimerTask{
    private Runnable runnable; // 任务
    private long time; // 此刻时间是绝对时间

    public MyTimerTask(Runnable runnable , long time) {
        this.runnable = runnable;
        this.time = System.currentTimeMillis()+time;
    }
    void run() {
        runnable.run();
    }
    long getTime() {
        return time;
    }

}

class MyTimer{
    // 通过 PriorityQueue 来组织若⼲个 Task 对象.并且按照 time 排序
    private PriorityQueue<MyTimerTask> queue = new PriorityQueue<MyTimerTask>((a,b)->{
        return (int)(a.getTime()-b.getTime());
    });
    // 创建一个锁,用于控制线程同步
    private Object locker = new Object();

    // 创建一个线程,线程中不断检查任务队列,如果任务队列中有任务,就执行任务
    public MyTimer() {
        Thread t = new Thread(() -> {
            synchronized (locker){
                while(true) {
                    if(queue.size() == 0) {
                        try {
                            locker.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    MyTimerTask task = queue.peek();
                    long curTime = System.currentTimeMillis();
                    if(task.getTime()<=curTime) {
                        //时间到了,该执行了
                        task.run();
                        queue.poll();
                    }else {
                        //时间没到,继续等待
                        try {
                            locker.wait(task.getTime()-curTime);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }
        });
        t.start();

    }
    //核⼼接⼝为 schedule, ⽤于注册⼀个任务, 并指定这个任务多⻓时间后执⾏
    void schedule(Runnable runnable , long time) {
        synchronized (locker) {
            MyTimerTask task = new MyTimerTask(runnable, time);
            queue.add(task);
            locker.notify();
        }
    }



}
public class Demo7 {
    public static void main(String[] args) {
        MyTimer timer = new MyTimer();
        timer.schedule(()->{
            System.out.println("Hello World 1000");
        },1000);
        timer.schedule(()->{
            System.out.println("Hello World 2000");
        },2000);
        timer.schedule(()->{
            System.out.println("Hello World 3000");
        },3000);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 阻塞队列
    • 阻塞队列是什么?
    • 生产者消费者模型
    • Java标准库的阻塞队列: 
    • 实现一个简单阻塞队列: 
  • 线程池 
    • 线程池是什么?
    • Java标准款中的线程池:
    •  实现一个简单的线程池:
  • 定时器 
    • 定时器是什么?
    • 实现一个简单的定时器
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档