我有一个java类来处理多线程订阅服务。通过实现可订阅接口,可以将任务提交给服务并定期执行。下面是代码的草图:
import java.util.concurrent.*;
public class Subscribtions {
private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
private ScheduledExecutorService threads;
public Subscribtions() {
threads = Executors.newScheduledThreadPool(16);
}
public void subscribe(Subscribable subscription) {
Runnable runnable = getThread(subscription);
Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}
/*
* Only called from controller thread
*/
public void unsubscribe(Subscribable subscription) {
Future<?> future = futures.remove(subscription); //1. Might be removed by worker thread
if (future != null)
future.cancel(false);
else {
//3. Worker-thread view := cacheFutures.put() -> futures.remove()
//4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
}
}
/*
* Only called from worker threads
*/
private void delay(Runnable runnable, Subscribable subscription, long delay) {
cacheFutures.put(subscription, 0); //2. Which is why it is cached first
Future<?> currentFuture = futures.remove(subscription);
if (currentFuture != null) {
currentFuture.cancel(false);
Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}
}
private Runnable getThread(Subscribable subscription) {
return new Runnable() {
public void run() {
//Do work...
boolean someCondition = true;
long someDelay = 100;
if (someCondition) {
delay(this, subscription, someDelay);
}
}
};
}
public interface Subscribable {
long getInitialDelay();
long getPeriod();
}
}
因此,该课程允许:
订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。例如,如果工作线程没有从上次执行中找到更新,或者例如,如果线程只需要在00.00 - 23.00之间执行,则可能发生这种情况。
我的问题是,一个工作线程可以调用delay()
并从ConcurrentMap中删除它的未来,而控制器线程可以同时调用unsubscribe()
。然后,如果控制器线程在工作线程进入新的未来之前检查ConcurrentMap,则unsubscribe()
调用将丢失。
有一些(也许不是详尽无遗的)解决方案:
delay()
和unsubscribe()
方法之间的锁delay()
方法中使用“缓存”删除期货至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()
之前发生的事件,而ConcurrentMap的原子性使控制器线程看到了futures.remove()
,那么它是否也看到了与工作线程相同的关系?即cacheFutures.put() -> futures.remove()
?或者原子性只适用于稍后传播到其他变量的更新的futures
映射吗?
任何其他评论也是欢迎的,尤指。考虑使用易失性关键字。是否应该声明缓存映射是不稳定的?谢谢!
发布于 2015-11-05 14:43:26
每次订阅一个锁将要求您维护另一个映射,并可能因此引入额外的并发问题。我认为最好避免这样做。同样的情况也适用于缓存已删除的订阅,另外,这也增加了不必要的资源保留的风险(请注意,您需要缓存的不是Future
本身,而是与它们关联的Subscribable
)。
无论如何,您都需要某种同步/锁定。例如,在您的选项(3)中,您需要避免对给定订阅在delay()
缓存订阅和删除订阅Future
之间发生delay()
。在没有某种形式的锁定的情况下,避免这种情况的唯一方法是,每个订阅只使用一个Future
,从subscribe()
注册时一直保持到unsubscribe()
删除为止。这样做不符合延迟预定订阅的能力。
至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()之前发生的事件,而ConcurrentMap的原子性使控制器线程看到futures.remove(),那么它是否也看到了与工作线程之前的关系相同的情况?
发生--以前是程序执行过程中动作之间的关系。它不特定于任何一个线程对执行的视图。
或者原子性只适用于期货地图,其他变量的更新将在以后传播?
控制器线程将始终在同一调用执行的cacheFutures.put()
之前看到delay()
调用所执行的futures.remove()
。不过,我觉得这帮不了你。
是否应该声明缓存映射是不稳定的?
不是的。这没有任何用处,因为尽管映射的内容发生了变化,但映射本身始终是相同的对象,而且对它的引用不会改变。
您可以考虑让subscribe()
、delay()
和unsubscribe()
分别在Subscribable
上同步。这并不是我所理解的每个订阅都有一个锁的意思,但这是相似的。这将避免需要单独的数据结构来维护这些锁。如果您希望避免显式同步,也可以将锁定方法构建到Subscribable
接口中。
发布于 2015-11-05 17:23:48
您有一个ConcurrentMap
,但您没有使用它。考虑一下以下几点:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
final class SO33555545
{
public static void main(String... argv)
throws InterruptedException
{
ScheduledExecutorService workers = Executors.newScheduledThreadPool(16);
Subscriptions sub = new Subscriptions(workers);
sub.subscribe(() -> System.out.println("Message received: A"));
sub.subscribe(() -> System.out.println("Message received: B"));
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
workers.shutdown();
}
}
final class Subscriptions
{
private final ConcurrentMap<Subscribable, Task> tasks = new ConcurrentHashMap<>();
private final ScheduledExecutorService workers;
public Subscriptions(ScheduledExecutorService workers)
{
this.workers = workers;
}
void subscribe(Subscribable sub)
{
Task task = new Task(sub);
Task current = tasks.putIfAbsent(sub, task);
if (current != null)
throw new IllegalStateException("Already subscribed");
task.activate();
}
private Future<?> schedule(Subscribable sub)
{
Runnable task = () -> {
sub.invoke();
if (Math.random() < 0.25) {
System.out.println("Delaying...");
delay(sub, 5);
}
};
return workers.scheduleAtFixedRate(task, sub.getPeriod(), sub.getPeriod(), TimeUnit.SECONDS);
}
void unsubscribe(Subscribable sub)
{
Task task = tasks.remove(sub);
if (task != null)
task.cancel();
}
private void delay(Subscribable sub, long delay)
{
Task task = new Task(sub);
Task obsolete = tasks.replace(sub, task);
if (obsolete != null) {
obsolete.cancel();
task.activate();
}
}
private final class Task
{
private final FutureTask<Future<?>> future;
Task(Subscribable sub)
{
this.future = new FutureTask<>(() -> schedule(sub));
}
void activate()
{
future.run();
}
void cancel()
{
boolean interrupted = false;
while (true) {
try {
future.get().cancel(false);
break;
}
catch (ExecutionException ignore) {
ignore.printStackTrace(); /* Cancellation is unnecessary. */
break;
}
catch (InterruptedException ex) {
interrupted = true; /* Keep waiting... */
}
}
if (interrupted)
Thread.currentThread().interrupt(); /* Reset interrupt state. */
}
}
}
@FunctionalInterface
interface Subscribable
{
default long getPeriod()
{
return 4;
}
void invoke();
}
https://stackoverflow.com/questions/33555545
复制相似问题