并发包下面的并发容器中,ConcurrentLinkedQueue 这种 Concurrent 类型和的集合才真正代表并发。
Java 集合中有个线程不安全的队列 LinkedList 是个 Deque。
能够理解 ConcurrentLinkedQueue 和 LinkedBlockingQueue 的主要区别。在常规队列操作基础上, Blocking意味着提供了特定的等待性操作。
适用阻塞队列的好处,多线程操作共同的队列时不需要额外的同步。
允许多线程访问一个共同集合,可以使用 ConcurrentLinedQueue Queue 中元素按 FIFO 原则进行排序。采用CAS 操作,来保证元素的一致性。
ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列的头尾都信息插入和删除:
ArrayBlockingQueue是最典型的的有界队列,其内部以final的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建ArrayBlockingQueue时,都要指定容量,如:
public ArrayBlockingQueue(int capacity, boolean fair)
其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为Integer.MAX_VALUE,成为了无界队列。
每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。这个队列的容量是0。
都是无边界队列,就是 put 操作永远不会发生其他 BlockingQueue 阻塞的情况。
/** Lock held by take, poll, etc */
private fnal ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private fnal Condition notEmpty = takeLock.newCondition();
/** Lock held by put, ofer, etc */
private fnal ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private fnal Condition notFull = putLock.newCondition();
LinkedBlockingQueue 改进了锁操作的力度,头,尾操作使用不同的锁,在通用的场景下,他的吞吐量相对要更好一些。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
ArrayBlockingQueue take 实现原理
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
队列的典型使用场景是 生产者 -消费者 场景。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConsumerProducer {
public static final String EXIT_MSG = "Good bye!";
public static void main(String[] args) {
// 使用较小的队列,以更好地在输出中展示其影响
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
static class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(5L);
String msg = "Message" + i;
System.out.println("Produced new item: " + msg);
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println("Time to say good bye!");
queue.put(EXIT_MSG);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
try {
String msg;
while (!EXIT_MSG.equalsIgnoreCase((msg = queue.take()))) {
System.out.println("Consumed item: " + msg);
Thread.sleep(10L);
}
System.out.println("Got exit message, bye!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}