在Red5服务器中,我们无法控制实现者如何处理他们的应用程序。因此,我们已经尝试实现代码,以防止它们导致不好的事情发生。
当来自客户端的消息传入时,该类将显示任务的创建:
ReceivedMessageTask task = new ReceivedMessageTask(sessionId, message, handler, this);
task.setMaxHandlingTimeout(maxHandlingTimeout);
ListenableFuture<Boolean> future = (ListenableFuture<Boolean>) executor.submitListenable(new ListenableFutureTask<Boolean>(task));以下类的设置方式可以防止“处理程序”(即实现者应用程序)使用所有周期或服务器崩溃。总之,具有允许运行时间期间的死锁保护会中断超过给定时间限制的操作。
我想回顾一下下面的代码,这些代码在运行服务器时的表现似乎与预期的一样。
public final class ReceivedMessageTask implements Callable<Boolean> {
private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTask.class);
private final RTMPConnection conn;
private final IRTMPHandler handler;
private final String sessionId;
private Packet message;
private AtomicBoolean done = new AtomicBoolean(false);
private DeadlockGuard guard;
private long maxHandlingTime = 500L;
public ReceivedMessageTask(String sessionId, Packet message, IRTMPHandler handler) {
this(sessionId, message, handler, (RTMPConnection) RTMPConnManager.getInstance().getConnectionBySessionId(sessionId));
}
public ReceivedMessageTask(String sessionId, Packet message, IRTMPHandler handler, RTMPConnection conn) {
this.sessionId = sessionId;
this.message = message;
this.handler = handler;
this.conn = conn;
}
public Boolean call() throws Exception {
Red5.setConnectionLocal(conn);
try {
if (maxHandlingTime <= 0) {
if (!Red5.isDebug()) {
guard = new DeadlockGuard();
new Thread(guard, String.format("DeadlockGuard#%s", sessionId)).start();
}
}
handler.messageReceived(conn, message);
} catch (Exception e) {
log.error("Error processing received message {} on {}", message, sessionId, e);
} finally {
Red5.setConnectionLocal(null);
done.set(true);
if (guard != null) {
guard.join();
}
}
return Boolean.valueOf(done.get());
}
public void setMaxHandlingTimeout(long maxHandlingTimeout) {
this.maxHandlingTime = maxHandlingTimeout;
}
private class DeadlockGuard implements Runnable {
private Thread taskThread;
private Thread guardThread = null;
AtomicBoolean sleeping = new AtomicBoolean(false);
DeadlockGuard() {
this.taskThread = Thread.currentThread();
}
public void run() {
try {
this.guardThread = Thread.currentThread();
if (log.isDebugEnabled()) {
log.debug("Threads - task: {} guard: {}", taskThread.getName(), guardThread.getName());
}
sleeping.compareAndSet(false, true);
Thread.sleep(maxHandlingTime);
} catch (InterruptedException e) {
log.debug("Deadlock guard interrupted on {} during sleep", sessionId);
} finally {
sleeping.set(false);
}
if (!done.get()) {
if (!taskThread.isInterrupted()) {
if (taskThread.isAlive()) {
log.warn("Interrupting unfinished active task on {}", sessionId);
taskThread.interrupt();
}
} else {
log.debug("Unfinished active task on {} already interrupted", sessionId);
}
}
}
public void join() {
if (sleeping.get()) {
guardThread.interrupt();
}
}
}
}发布于 2015-10-24 13:11:26
代码看起来很好--从实际的角度来看,我有几点评论:
不要用新的Thread()来在单独的线程中运行某些东西--使用为其分配的具有固定线程数的线程池。
您可能遇到的问题是,如果调用API的客户端线程没有被线程池控制--您可能会耗尽资源。因此,您需要控制submitListenable调用,以便在池中为相同大小的DeadlockGuard任务拥有单独的池。
另外,我建议使用等待/通知机制,因为从开发人员的角度来看,这是更可控的,而不是中断--但是如果它有效的话,您的方法也许也是好的。
Nit - private:
private AtomicBoolean sleeping = new AtomicBoolean(false);Nit -良好的实践是设置超时的加入以及。
https://codereview.stackexchange.com/questions/74400
复制相似问题