一.阻塞队列
1.阻塞队列是⼀种特殊的队列. 也遵守 "先进先出" 的原则
阻塞队列能是⼀种线程安全的数据结构, 并且具有以下特性:
1.1.当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素
1.2.当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素
阻塞队列最主要的应用场景--->生产者消费者模型
2.生产者消费者模型的优点:
解耦合:
两个线程或者两个服务器之间如果直接访问,那么他们的耦合度就会很高,这个时候如果加入阻塞队列,让这两个服务器分别和这个阻塞队列交互,就会达到解耦合的目的
削峰填谷:
阻塞队列的重要程度不小,所以也会直接会搞成几个服务器
缺点:
(1).效率会降低
(2).部署更多服务器生产环境复杂,管理起来比较麻烦
3.生产者消费者模型的实现:
自己实现一个阻塞队列,实现生产者消费者模型
代码:
class MyBlockingQueue{
private String[] data = null;
private int head = 0;//队首
private int tail = 0;//队尾
private int size = 0;//元素个数
MyBlockingQueue(int capacity){
data = new String[capacity];
}
//入队列
public void put(String elem) throws InterruptedException {
synchronized (this) {
while (size >= data.length) {
//阻塞
this.wait();//队列不满时唤醒,其他线程take
}
data[tail] = elem;
// tail = (tail+1) % data.length;
tail++;
if (size >= data.length) {
tail = 0;
}
size++;
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
//阻塞
this.wait();//队列不空时唤醒,其他线程put
}
String ret = data[head];
head++;
if (size >= data.length) {
head = 0;
}
// head = (head+1) % data.length;
size--;
this.notify();
return ret;
}
}
}
使用: 至少一个生产者线程,一个消费者线程
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(100);
Thread producer = new Thread(()->{
int n = 0;
while (true){
try {
queue.put(n + "");
System.out.println("生产元素 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
},"producer ");
Thread consumer = new Thread(()->{
while (true){
try {
String n = queue.take();
System.out.println("消费元素 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer ");
producer.start();
consumer.start();
}
现象:这里Sleep加在生产者哪,导致产生的慢,队列可能为空阻塞,导致生产一个消费一个
4.标准库中的阻塞队列:
在 Java 标准库中内置了阻塞队列. 如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的即可.
BlockingQueue 是⼀个接口. 真正实现的类是 有链表类型LinkedBlockingQueue.
也有循环数组类型ArrayBlockingQueue
注意:
put 方法用于阻塞式的⼊队列, take ⽤于阻塞式的出队列.
BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些方法不带有阻塞特性
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
Thread producer = new Thread(()->{
int n = 0;
while (true){
try {
queue.put(n);
System.out.println("生产元素 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
},"producer ");
Thread consumer = new Thread(()->{
while (true){
try {
Integer n = queue.take();
System.out.println("消费元素 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer ");
producer.start();
consumer.start();
}
二.线程池 :
1.线程是什么? 里面有大量的线程可以直接给我们使用,不用系统去创建让我们高效的创建销毁线程,就和常量池类似,在Java程序构建时候准备好,等程序运行时直接加载到内存中使用
2.为什么要有线程池? 就是让我们高效的创建销毁线程,就像创建进程太慢了引入线程一样的道理。
3. 为什么直接从线程池取线程比创建线程快? 因为操作中唯一,一份内核要为很多应用程序提供服务,相当于创建线程交给了操作系统,这是不可控的 (如果要创建线程A,而操作系统有很多线程,创建分配资源是不可控的,可能很晚才轮到A,所以还不如,通过代码的方式创建好线程这样就可控且高效)
4.Java标准库中的线程池:
ThreadPoolExecutor里有一些线程可以让这些线程执行任务主要涉及的方法有:
核心方法submit(Runnable),
Runnable描述一段要执行的任务
submit把任务放到线程池中执行
构造ThreadPoolExecutor这个类构造方法最多有7个
接下来我们来分析这7个参数
注:Java线程池中有几个线程任务多时自动创建出多个线程,任务少时会销毁
corePoolSize:为核心线程数,核心线程在线程池创建时就创建,一直到线程池销毁才会跟着销毁
maximumPoolSize: 为最大线程数 (核心线程+非核心线程)非核心线程就是上面说的线程任务多时自动创建出多个线程,任务少时会销毁
keepAliveTime: 非核心线程允许存活的最大时间,(非核心线程不是任务少时立即销毁,会有一段存活时间)
unit: 定义一些枚举,比如上面线程存活时间,的时间
workQueue:工作队列(阻塞队列)(线程池的本质也是,生产者消费者模型,调用submit就是在生产任务,线程池则消费任务,线程池相当于消费者)
threadFactory: 工厂模式(工厂模式也属于一种设计模式),弥补构造方法的缺陷,但是在构造方法上有时候无法重载,如果必须要系统类型的参数时
这个时候就可以单独封装一个工厂类去实现构造,用静态的方法,把构造对象过程和各种属性初始化封装起来。
class PointFactory {
public static Point makePointByXY(double x, double y) {
Point p = new Point();
// 通过 x 和 y 给 p 进行属性设置
return p;
}
public static Point makePointByRA(double r, double a) {
Point p = new Point();
// 通过 r 和 a 给 p 进行属性设置
return p;
}
}
handler: 拒绝策略,submit方法把任务添加到阻塞队列中不会阻塞(阻塞会影响客户端的体验),而是使用拒绝策略 ;具体怎么拒绝还要引入四个类说明:
(1).AbortPolicy: 线程池直接抛出异常,可能导致线程池无法继续工作。
(2).CallerRunsPolicy: 让调用submit方法的线程自己去执行任务
(submit方法里可能先判断队列是否满,如果满了再判断是否执行CallerRunsPolicy策略,如果要执行,就调用里面的Runnable.run;让调用者线程自己去执行任务)
(3).DiscardOldesPolicy: 丢弃队列中最老的任务
(4).DiscardPolicy: 丢弃队列中最新的任务(当前submit这个任务)
5.Java标准库也提供了另外一组类Executors,对ThreadPoolExecutor这个类进一步封装,来简化线程池的使用,由于被进一步封装线程数目和拒绝策略是隐式的,规模大业务多的公司不好控制,所以阿里巴巴编程规范手册里,不推荐使用,但是大多数公司还是支持的具体看公司规范。
newFixedThreadPool: 创建固定线程数的线程池
newCachedThreadPool: 创建线程数⽬动态增⻓的线程池.
newSingleThreadExecutor: 创建只包含单个线程的线程池
6.自己实现线程池:
线程池相当于消费者,消费阻塞队列里的任务,线程池的实现也是基于生产者消费者模型的:
package demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:
* User: 苏李涛
* Date: 2024-10-22
* Time: 16:48
*/
// 实现一个固定线程个数的线程池
class MyThreadPool{
BlockingQueue<Runnable> queue = null;
MyThreadPool(int n){
// 初始化线程池,创建固定个数的线程
// 这里使用ArrayBlockingQueue作为任务队列, 容量为1000
queue = new ArrayBlockingQueue<>(1000);
// 创建 N 个线程
for(int i = 0; i < n; i++) {
Thread t = new Thread(()->{
try {
while (true){
Runnable task = queue.take();
task.run();
}
}catch (InterruptedException e){
e.printStackTrace();
}
});
t.setDaemon(true);//把线程设置为后台线程,方便结束,注意:此方法必须在线程启动之前调用
t.start();
}
}
public void submit(Runnable task){
task.run();
}
}
public class Demo1 {
public static void main(String[] args) {
MyThreadPool pool = new MyThreadPool(10);
// 向线程池提交任务
for (int i = 0; i < 100; i++) {
int id = i;
pool.submit(()->{
System.out.println(Thread.currentThread().getName() +" "+ "id=" + id);
});
}
}
}