今天为大家带来的是并发设计模式实战系列,第十一章两阶段终止(Two-Phase Termination),废话不多说直接开始~
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 发出终止信号 │───>│ 处理未完成请求 │───>│ 释放资源并退出 │
└───────────────┘ └───────────────┘ └───────────────┘
while (!Thread.currentThread().isInterrupted()) {
try {
// 正常任务处理...
} catch (InterruptedException e) {
// 1. 重新设置中断标志(保持中断状态)
Thread.currentThread().interrupt();
// 2. 执行资源清理
cleanup();
break;
}
}
系统组件 | 现实类比 | 核心行为 |
---|---|---|
阶段1通知 | 门口挂"停止营业"牌 | 不再接待新顾客 |
阶段2清理 | 服务员处理现有顾客 | 完成已点餐品,收拾桌椅 |
资源释放 | 关闭厨房设备 | 断电、锁门、清理食材 |
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class TwoPhaseTermination {
// 终止标志(volatile保证可见性)
private volatile boolean shutdownRequested = false;
// 工作线程池
private final ExecutorService workers = Executors.newFixedThreadPool(4);
// 监控线程
private Thread monitorThread;
public void start() {
monitorThread = new Thread(() -> {
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
try {
// 模拟监控任务
System.out.println("[Monitor] 检查系统状态...");
Thread.sleep(1000);
} catch (InterruptedException e) {
// 收到中断信号,准备终止
Thread.currentThread().interrupt();
System.out.println("[Monitor] 收到终止信号");
}
}
System.out.println("[Monitor] 执行清理工作...");
});
monitorThread.start();
}
// 优雅终止方法
public void shutdownGracefully() {
// 阶段1:设置终止标志
shutdownRequested = true;
// 阶段2:中断所有线程
monitorThread.interrupt();
workers.shutdown(); // 停止接收新任务
try {
// 等待现有任务完成(带超时)
if (!workers.awaitTermination(5, TimeUnit.SECONDS)) {
workers.shutdownNow(); // 强制终止
}
} catch (InterruptedException e) {
workers.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("系统已安全关闭");
}
// 提交任务方法
public void submitTask(Runnable task) {
if (!shutdownRequested) {
workers.execute(() -> {
try {
task.run();
} catch (Exception e) {
if (shutdownRequested) {
System.out.println("任务被终止: " + e.getMessage());
}
}
});
}
}
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination system = new TwoPhaseTermination();
system.start();
// 模拟提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
system.submitTask(() -> {
try {
Thread.sleep(500);
System.out.println("执行任务: " + taskId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 5秒后触发终止
Thread.sleep(5000);
system.shutdownGracefully();
}
}
// 双重终止检查(提高响应速度)
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
// ...
}
// 资源清理模板
try {
// 正常业务代码...
} finally {
cleanupResources(); // 保证无论如何都会执行
}
终止方式 | 是否优雅 | 资源安全性 | 响应速度 | 实现复杂度 |
---|---|---|---|---|
System.exit() | ❌ | ❌ | ⚡️立即 | 低 |
暴力kill -9 | ❌ | ❌ | ⚡️立即 | 低 |
两阶段终止 | ✅ | ✅ | ⏳可控 | 中 |
超时强制终止 | ⚠️部分 | ⚠️可能泄漏 | ⏳可配置 | 中高 |
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
标志位检查 | 简单循环任务 | 实现简单 | 阻塞操作无法响应 |
Thread.interrupt() | 含阻塞操作的任务 | 能唤醒阻塞 | 需处理InterruptedException |
Future.cancel() | 线程池任务 | 与线程池集成好 | 无法自定义清理逻辑 |
Poison Pill | 生产者-消费者模式 | 精确控制 | 需要特殊消息设计 |
public void shutdownAll(ExecutorService... services) {
// 阶段1:发送关闭信号
for (ExecutorService service : services) {
service.shutdown();
}
// 阶段2:等待终止
for (ExecutorService service : services) {
try {
if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
service.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
service.shutdownNow();
}
}
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("执行JVM退出前的清理...");
// 记录最后状态、关闭外部连接等
}));
┌───────────────┐ ┌──────────────────┐ ┌───────────────┐
│ 停止负载均衡 │───>│ 完成进行中请求 │───>│ 下线服务实例 │
└───────────────┘ └──────────────────┘ └───────────────┘
通过这种 分层解析+生产级代码示例 的方式,可以全面掌握两阶段终止模式的实现要点,并能在实际系统中实现安全、可控的服务终止。
好的!我将延续原有结构,从 第六部分 开始扩展两阶段终止模式的进阶内容,保持技术深度和实用性的统一。
┌───────────────┐ ┌──────────────────┐ ┌───────────────┐
│ 协调者广播 │───>│ 各节点执行终止 │───>│ 全局状态确认 │
│ TERMINATE信号 │ │ (本地两阶段) │ │ (ACK汇总) │
└───────────────┘ └──────────────────┘ └───────────────┘
public class DistributedTermination {
private final CuratorFramework zkClient;
private final String servicePath;
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
public DistributedTermination(String zkAddress, String serviceName) {
this.zkClient = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
this.servicePath = "/services/" + serviceName;
zkClient.start();
}
// 注册当前节点
public void registerNode(String nodeId) throws Exception {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(servicePath + "/" + nodeId);
}
// 分布式终止入口
public void shutdownCluster() throws Exception {
if (isShuttingDown.compareAndSet(false, true)) {
// 阶段1:创建终止标记节点
zkClient.create()
.withMode(CreateMode.PERSISTENT)
.forPath(servicePath + "/TERMINATE");
// 阶段2:监听所有节点消失(确认终止完成)
awaitTermination();
}
}
// 节点自身的终止逻辑
public void startShutdownListener() {
PathChildrenCache watcher = new PathChildrenCache(zkClient, servicePath, true);
watcher.getListenable().addListener((client, event) -> {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED &&
"TERMINATE".equals(event.getData().getPath())) {
// 执行本地两阶段终止
localShutdown();
}
});
}
private void awaitTermination() throws Exception {
while (zkClient.getChildren().forPath(servicePath).size() > 1) {
Thread.sleep(500);
}
zkClient.delete().forPath(servicePath + "/TERMINATE");
System.out.println("集群终止完成");
}
}
指标 | 监控方式 | 健康阈值 |
---|---|---|
终止延迟 | 阶段1到阶段2的耗时统计 | 90%请求 < 2秒 |
资源释放率 | 文件句柄/连接池关闭验证 | 释放率 >= 99.9% |
中断响应时间 | 从发送中断到线程停止的延迟 | 95%线程 < 500ms |
// 陷阱1:忘记恢复中断状态
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 错误做法:仅打印日志
// log.error("Interrupted", e);
// 正确做法:恢复中断状态
Thread.currentThread().interrupt();
}
// 陷阱2:阻塞队列无法唤醒
BlockingQueue<Task> queue = new LinkedBlockingQueue<>();
// 需要特殊唤醒方式
queue.put(POISON_PILL); // 投递毒丸对象
// 陷阱3:第三方库不响应中断
Future<?> future = executor.submit(() -> {
// 使用非中断阻塞的JNI调用
nativeBlockingCall();
});
future.cancel(true); // 可能无法真正终止
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 终止信号触发 │───>│ 断路器打开状态 │───>│ 拒绝新请求 │
│ (Phase 1) │ │ (快速失败) │ │ (Phase 2前置) │
└───────────────┘ └───────────────┘ └───────────────┘
// Akka示例:优雅终止Actor
actorSystem.registerOnTermination(() -> {
// 阶段2的清理逻辑
database.close();
});
// 发送终止命令
Patterns.gracefulStop(actorRef, Duration.ofSeconds(5), Shutdown.getInstance());
kill -9
后验证资源泄漏// 阶段1日志标记
log.info("TERMINATION PHASE1 STARTED | Pending tasks: {}", queue.size());
// 阶段2关键操作
log.info("Releasing DB connections | Active: {}", pool.getActiveCount());
// 最终确认
log.info("TERMINATION COMPLETED | Time elapsed: {}ms", System.currentTimeMillis() - startTime);
维度 | 单机两阶段终止 | 分布式两阶段终止 |
---|---|---|
信号传播方式 | 内存可见性/线程中断 | 集群广播/协调服务 |
完成确认机制 | 线程池awaitTermination | 集群状态共识算法 |
典型耗时 | 毫秒~秒级 | 秒~分钟级 |
资源清理保证 | 进程内可控 | 依赖各节点实现 |
终止超时时间 = Max(平均任务处理时间 × 3, 网络延迟 × 10)
Max(600ms, 500ms) = 600ms
通过这十个维度的系统化解析,两阶段终止模式从单机实现到分布式协同,从基础原理到生产实践的全貌已完整呈现。建议结合具体业务场景,灵活应用这些模式变体。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有