你好,这里是codetrend专栏“高并发编程基础”。
与网络通信等进程间通信方式不同,线程间通信是指在同一个进程内的多个线程之间进行的通信。
在多线程编程中,当多个线程需要互斥地访问共享资源时,它们会相互之间发送信号或等待信号的通知。
这些通信方式包括线程等待数据到达的通知、线程收到变量改变的信号等。
本文将探讨Java提供的原生通信API,以及这些通信机制背后的原理和实现细节。
同步阻塞消息处理机制:
优点:
缺点:
异步非阻塞消息处理机制:
优点:
缺点:
同步阻塞消息处理适合简单场景和顺序处理的需求,而异步非阻塞消息处理适合高并发、高效率和容错性要求较高的场景。
在Java中,使用wait和notify/notifyAll来实现同步阻塞和异步非阻塞模型通信是常见的做法。
wait & notify 两个函数均是java.lang.Object
对象的借口,也就是说所有对象都有这两个函数。
使当前线程等待,直到被唤醒,通常是通过被通知或中断来实现。在所有方面,该方法的行为就像调用了wait(0L, 0)一样。
唤醒一个正在等待此对象监视器(monitor)的单个线程。如果有任何线程在等待此对象,则选择其中一个线程进行唤醒。选择是任意的,并由实现自行决定。线程通过调用wait方法之一来等待对象的监视器。被唤醒的线程在当前线程释放此对象上的锁之前,无法继续执行。被唤醒的线程将以通常的方式与任何其他正在积极竞争同步此对象的线程进行竞争;例如,被唤醒的线程在成为下一个锁定此对象的线程方面没有可靠的优势或劣势。一次只能有一个线程拥有对象的监视器。此方法应该仅由拥有此对象监视器的线程调用。
线程以以下三种方式成为对象监视器的所有者:
通过使用wait & notify函数设计一个EventQueue先进先出(FIFO)队列来演示单线程间通信。
package engineer.concurrent.battle.fcontact;
import java.util.LinkedList;
public class EventQueueSingle {
private final int max;
static class Event {}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX = 20;
public EventQueueSingle(int max) {
this.max = max;
}
public EventQueueSingle() {
this(DEFAULT_MAX);
}
public void enqueue(Event event) {
synchronized (eventQueue) {
if (eventQueue.size() >= max) {
try {
System.out.println("EventQueue is full, waiting for dequeue...");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
eventQueue.addLast(event);
System.out.println(Thread.currentThread().getName() + ": enqueues success. and size is "+eventQueue.size());
eventQueue.notify();
}
}
public Event dequeue() {
synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
try {
System.out.println("EventQueue is empty, waiting for enqueue...");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": dequeues success. and size is "+eventQueue.size());
Event event = eventQueue.removeFirst();
eventQueue.notify();
return event;
}
}
}
EventQueueSingle有三个状态:
EventQueueSingle提供了两个接口进行生产消费,EventQueueSingle使用了synchronized
和 wait & notify
来实现生产消费的顺序和状态校验。
synchronized
锁定队列,队列满的状态则调用wait
函数进行等待,直到队列消费notify
后进行对象的添加,并且通知可能的消费wait
。synchronized
锁定队列,队列空的状态则调用wait
函数进行等待,直到队列生产notify
后进行对象的添加,并且通知可能的生产wait
。测试代码如下:
package engineer.concurrent.battle.fcontact;
import java.util.concurrent.TimeUnit;
public class EventQueueSingleTest {
public static void main(String[] args) {
final EventQueueSingle queue = new EventQueueSingle();
new Thread(()->{
for (;;) {
queue.enqueue(new EventQueueSingle.Event());
}
},"producer").start();
new Thread(()->{
for (;;) {
queue.dequeue();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"consumer1").start();
new Thread(()->{
for (;;) {
queue.dequeue();
queue.dequeue();
try {
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"consumer2").start();
}
}
可能的输出结果如下:
producer: enqueues success. and size is 20
EventQueue is full, waiting for dequeue...
consumer2: dequeues success. and size is 20
consumer2: dequeues success. and size is 19
producer: enqueues success. and size is 19
producer: enqueues success. and size is 20
EventQueue is full, waiting for dequeue...
多线程间通信需要用到Object的notifyAll函数,可以同时唤醒全部阻塞的线程,同样被唤醒的线程仍然需要争抢monitor的所有权。
以下是优化后修改使用notifyAll的EventQueue。
package engineer.concurrent.battle.fcontact;
import java.util.LinkedList;
public class EventQueue {
private final int max;
static class Event {}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX = 20;
public EventQueue(int max) {
this.max = max;
}
public EventQueue() {
this(DEFAULT_MAX);
}
public void enqueue(Event event) {
synchronized (eventQueue) {
while (eventQueue.size() >= max) {
try {
System.out.println("EventQueue is full, waiting for dequeue...");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
eventQueue.addLast(event);
System.out.println(Thread.currentThread().getName() + ": enqueues success. and size is "+eventQueue.size());
eventQueue.notifyAll();
}
}
public Event dequeue() {
synchronized (eventQueue) {
while (eventQueue.isEmpty()) {
try {
System.out.println("EventQueue is empty, waiting for enqueue...");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": dequeues success. and size is "+eventQueue.size());
Event event = eventQueue.removeFirst();
eventQueue.notifyAll();
return event;
}
}
}
测试代码如下:
package engineer.concurrent.battle.fcontact;
import java.util.concurrent.TimeUnit;
public class EventQueueTest {
public static void main(String[] args) {
final EventQueue queue = new EventQueue();
new Thread(()->{
for (;;) {
queue.enqueue(new EventQueue.Event());
}
},"producer").start();
new Thread(()->{
for (;;) {
queue.dequeue();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"consumer1").start();
new Thread(()->{
for (;;) {
queue.dequeue();
queue.dequeue();
try {
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"consumer2").start();
}
}
输出结果如下:
EventQueue is full, waiting for dequeue...
consumer1: dequeues success. and size is 20
producer: enqueues success. and size is 20
EventQueue is full, waiting for dequeue...
consumer2: dequeues success. and size is 20
consumer2: dequeues success. and size is 19
producer: enqueues success. and size is 19
producer: enqueues success. and size is 20
EventQueue is full, waiting for dequeue...
通过上面提到的wait和notify、notifyAll或方法可以简单的实现一个显示锁,这里命名为 MyLock 。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class MyLock implements Lock {
private boolean isLocked = false;
@Override
public synchronized void lock() {
while (isLocked) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
isLocked = true;
}
@Override
public synchronized void unlock() {
isLocked = false;
notifyAll();
}
@Override
public synchronized void lockInterruptibly() throws InterruptedException {
while (isLocked) {
wait();
}
isLocked = true;
}
@Override
public synchronized boolean tryLock() {
if (!isLocked) {
isLocked = true;
return true;
}
return false;
}
@Override
public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeoutInMillis = unit.toMillis(time);
while (isLocked) {
long elapsedTime = System.currentTimeMillis() - startTime;
long remainingTime = timeoutInMillis - elapsedTime;
if (remainingTime <= 0) {
return false;
}
wait(remainingTime);
}
isLocked = true;
return true;
}
@Override
public synchronized Condition newCondition() {
throw new UnsupportedOperationException("newCondition method not supported");
}
}
测试代码如下:
package engineer.concurrent.battle.fcontact;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import static java.util.concurrent.ThreadLocalRandom.current;
public class MyLockTest {
private final MyLock myLock = new MyLock();
public void syncMethod(){
//加锁
myLock.lock();
try {
int randomInt = current().nextInt(10);
System.out.println(Thread.currentThread().getName()+" gets the lock");
TimeUnit.SECONDS.sleep(randomInt);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
// 释放锁
myLock.unlock();
System.out.println(Thread.currentThread().getName()+" releases the lock");
}
}
public static void main(String[] args) {
MyLockTest mlt = new MyLockTest();
for (int i = 0; i < 10; i++) {
new Thread(mlt::syncMethod).start();
}
}
}
输出结果如下:
Thread-0 gets the lock
Thread-2 gets the lock
Thread-0 releases the lock
Thread-2 releases the lock
Thread-9 gets the lock
Thread-9 releases the lock
Thread-1 gets the lock
Thread-1 releases the lock
Thread-8 gets the lock
Thread-8 releases the lock
Thread-3 gets the lock
Thread-3 releases the lock
Thread-7 gets the lock
Thread-7 releases the lock
Thread-4 gets the lock
Thread-4 releases the lock
Thread-6 gets the lock
Thread-6 releases the lock
Thread-5 gets the lock
Thread-5 releases the lock
通过测试发现MyLock能够达到和synchronized一致的效果。
来自一线全栈程序员nine的探索与实践,持续迭代中。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。