Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Java发生-在两个ConcurrentMaps上一致的线程视图之前

Java发生-在两个ConcurrentMaps上一致的线程视图之前
EN

Stack Overflow用户
提问于 2015-11-05 13:56:07
回答 2查看 159关注 0票数 1

我有一个java类来处理多线程订阅服务。通过实现可订阅接口,可以将任务提交给服务并定期执行。下面是代码的草图:

代码语言:javascript
运行
AI代码解释
复制
import java.util.concurrent.*;

public class Subscribtions {

    private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
    private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
    private ScheduledExecutorService threads;

    public Subscribtions() {
        threads = Executors.newScheduledThreadPool(16);
    }

    public void subscribe(Subscribable subscription) {
        Runnable runnable = getThread(subscription);
        Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
        futures.put(subscription, future);
    }

    /*
     * Only called from controller thread
     */
    public void unsubscribe(Subscribable subscription) {
        Future<?> future = futures.remove(subscription);    //1. Might be removed by worker thread 
        if (future != null)
            future.cancel(false);
        else {
            //3. Worker-thread view     := cacheFutures.put() -> futures.remove()
            //4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
        }
    }

    /*
     * Only called from worker threads
     */
    private void delay(Runnable runnable, Subscribable subscription, long delay) {
        cacheFutures.put(subscription, 0);                  //2. Which is why it is cached first
        Future<?> currentFuture = futures.remove(subscription);
        if (currentFuture != null) {
            currentFuture.cancel(false);
            Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
            futures.put(subscription, future);
        }
    }

    private Runnable getThread(Subscribable subscription) {
        return new Runnable() {
            public void run() {
                //Do work...
                boolean someCondition = true;
                long someDelay = 100;
                if (someCondition) {
                    delay(this, subscription, someDelay);
                }
            }
        };
    }

    public interface Subscribable {
        long getInitialDelay();
        long getPeriod();
    }
}

因此,该课程允许:

  • 订阅新任务
  • 取消订阅现有任务
  • 延迟周期性执行的任务

订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。例如,如果工作线程没有从上次执行中找到更新,或者例如,如果线程只需要在00.00 - 23.00之间执行,则可能发生这种情况。

我的问题是,一个工作线程可以调用delay()并从ConcurrentMap中删除它的未来,而控制器线程可以同时调用unsubscribe()。然后,如果控制器线程在工作线程进入新的未来之前检查ConcurrentMap,则unsubscribe()调用将丢失。

有一些(也许不是详尽无遗的)解决方案:

  • 使用delay()unsubscribe()方法之间的锁
  • 和上面一样,但是每次订阅一个锁。
  • (首选?)不使用锁,但在delay()方法中使用“缓存”删除期货

至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()之前发生的事件,而ConcurrentMap的原子性使控制器线程看到了futures.remove(),那么它是否也看到了与工作线程相同的关系?即cacheFutures.put() -> futures.remove()?或者原子性只适用于稍后传播到其他变量的更新的futures映射吗?

任何其他评论也是欢迎的,尤指。考虑使用易失性关键字。是否应该声明缓存映射是不稳定的?谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-11-05 14:43:26

每次订阅一个锁将要求您维护另一个映射,并可能因此引入额外的并发问题。我认为最好避免这样做。同样的情况也适用于缓存已删除的订阅,另外,这也增加了不必要的资源保留的风险(请注意,您需要缓存的不是Future本身,而是与它们关联的Subscribable)。

无论如何,您都需要某种同步/锁定。例如,在您的选项(3)中,您需要避免对给定订阅在delay()缓存订阅和删除订阅Future之间发生delay()。在没有某种形式的锁定的情况下,避免这种情况的唯一方法是,每个订阅只使用一个Future,从subscribe()注册时一直保持到unsubscribe()删除为止。这样做不符合延迟预定订阅的能力。

至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()之前发生的事件,而ConcurrentMap的原子性使控制器线程看到futures.remove(),那么它是否也看到了与工作线程之前的关系相同的情况?

发生--以前是程序执行过程中动作之间的关系。它不特定于任何一个线程对执行的视图。

或者原子性只适用于期货地图,其他变量的更新将在以后传播?

控制器线程将始终在同一调用执行的cacheFutures.put()之前看到delay()调用所执行的futures.remove()。不过,我觉得这帮不了你。

是否应该声明缓存映射是不稳定的?

不是的。这没有任何用处,因为尽管映射的内容发生了变化,但映射本身始终是相同的对象,而且对它的引用不会改变。

您可以考虑让subscribe()delay()unsubscribe()分别在Subscribable上同步。这并不是我所理解的每个订阅都有一个锁的意思,但这是相似的。这将避免需要单独的数据结构来维护这些锁。如果您希望避免显式同步,也可以将锁定方法构建到Subscribable接口中。

票数 0
EN

Stack Overflow用户

发布于 2015-11-05 17:23:48

您有一个ConcurrentMap,但您没有使用它。考虑一下以下几点:

代码语言:javascript
运行
AI代码解释
复制
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

final class SO33555545
{

  public static void main(String... argv)
    throws InterruptedException
  {
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(16);
    Subscriptions sub = new Subscriptions(workers);
    sub.subscribe(() -> System.out.println("Message received: A"));
    sub.subscribe(() -> System.out.println("Message received: B"));
    Thread.sleep(TimeUnit.SECONDS.toMillis(30));
    workers.shutdown();
  }

}

final class Subscriptions
{

  private final ConcurrentMap<Subscribable, Task> tasks = new ConcurrentHashMap<>();

  private final ScheduledExecutorService workers;

  public Subscriptions(ScheduledExecutorService workers)
  {
    this.workers = workers;
  }

  void subscribe(Subscribable sub)
  {
    Task task = new Task(sub);
    Task current = tasks.putIfAbsent(sub, task);
    if (current != null)
      throw new IllegalStateException("Already subscribed");
    task.activate();
  }

  private Future<?> schedule(Subscribable sub)
  {
    Runnable task = () -> {
      sub.invoke();
      if (Math.random() < 0.25) {
        System.out.println("Delaying...");
        delay(sub, 5);
      }
    };
    return workers.scheduleAtFixedRate(task, sub.getPeriod(), sub.getPeriod(), TimeUnit.SECONDS);
  }

  void unsubscribe(Subscribable sub)
  {
    Task task = tasks.remove(sub);
    if (task != null)
      task.cancel();
  }

  private void delay(Subscribable sub, long delay)
  {
    Task task = new Task(sub);
    Task obsolete = tasks.replace(sub, task);
    if (obsolete != null) {
      obsolete.cancel();
      task.activate();
    }
  }

  private final class Task
  {

    private final FutureTask<Future<?>> future;

    Task(Subscribable sub)
    {
      this.future = new FutureTask<>(() -> schedule(sub));
    }

    void activate()
    {
      future.run();
    }

    void cancel()
    {
      boolean interrupted = false;
      while (true) {
        try {
          future.get().cancel(false);
          break;
        }
        catch (ExecutionException ignore) {
          ignore.printStackTrace(); /* Cancellation is unnecessary. */
          break;
        }
        catch (InterruptedException ex) {
          interrupted = true; /* Keep waiting... */
        }
      }
      if (interrupted)
        Thread.currentThread().interrupt(); /* Reset interrupt state. */
    }

  }

}

@FunctionalInterface
interface Subscribable
{

  default long getPeriod()
  {
    return 4;
  }

  void invoke();

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33555545

复制
相关文章
邮箱安全服务专题 | 发现邮箱风险,在发生安全事件之前
邮件服务占据互联网应用的“半壁江山",境外攻击者通过大范围针对邮箱系统扫描攻击,来窃取资料,从邮件系统诞生针对邮件系统的安全攻击从来没有间断过。并伴随着攻击手法越来越高级,通过APT等攻击手法来持续化攻击,极大地困扰着企业,政府以及监管单位系,简单的邮件防护已经无法完全防御入侵事件的发生。就在上周召开的第四届世界互联网大会上安恒信息发布了邮箱安全综合解决方案,方案发布之后,收到了很多客户网友的微信和电话咨询,采用什么服务内容和分析手段解决邮箱安全问题。我们主要为用户提供四个持续的服务手段,帮助企事业单位用户
安恒信息
2018/04/10
1.4K0
邮箱安全服务专题 |  发现邮箱风险,在发生安全事件之前
java——推断日期是否在今天之前
这里说的日期是指字符串的日期格式,如“2014-10-15”,我们要推断这个日期是否在今天之前,网上看到好多推断的方法,都是拿这个日期转换成Date对象 然后与new Date()比較,使用comparetTo() 或者before()方法,事实上这样做都会有点小问题,这样做忽略了一个小时分钟的比較,会出现错误,得不到正确的结果。
全栈程序员站长
2022/07/08
2.2K0
Java 多线程(3)---- 线程的同步(上)
我们在前面两篇文章中分别看了一下 Java 线程的一些概念、用法和对于线程控制(开始、暂停、停止)等,并对其中的一些易错点进行了总结,如果你是对这些概念还是还不是太熟悉,建议先看一下前面的文章:Java 多线程(1)— 初识线程 和 Java 多线程(2) — 线程的控制。这篇文章我们来继续讨论 Java 多线程 — 线程的同步。
指点
2019/01/18
7360
Java 多线程(3)---- 线程的同步(上)
Java 中的几种线程池,你之前用对了吗
虽然大家应该都已经很清楚了,但还是说一下。其实归根结底最主要的一个原因就是为了提高性能。
古时的风筝
2018/07/31
4310
【译】在正确的线程上观察
尽管很多人了解RxJava的基本逻辑,但是在Observable链和操作符究竟运行在哪个线程,仍然会有许多困惑。
用户1740424
2018/07/23
5430
【译】在正确的线程上观察
比之前两个难?
之前给大家介绍了二叉树的前序遍历,中序遍历的迭代法和 Morris 方法,今天咱们来说一下二叉后序遍历的迭代法及 Morris 方法。
公众号袁厨的算法小屋
2022/09/20
2080
比之前两个难?
iOS 程序 main函数之前发生什么
一个iOS App 的 main函数位于main.m中,这是我们熟知的程序入口。但对objc了解更多之后发现,程序在进入我们的main函数前已经执行了很多代码,比如熟知的+load方法等。
且行且珍惜_iOS
2022/05/13
8840
iOS 程序 main函数之前发生什么
Java 多线程(6)----线程池(上)
在前面的系列文章中,我们介绍了一下 Java 中多线程的一些主要的知识点和多线程并发程序的设计和处理思想。包括线程的介绍、生命周期、线程的运行控制。之后介绍了如何确保 Java 多线程并发程序的正确性,即通过锁(ReentrantLock 、synchronized )的思想来实现多线程执行顺序的控制等。如果你对这些还不熟悉,建议看一下前面的文章。接下来我们来看一下 Java 多线程中另一个重要的知识:线程池,在此之前,我们需要了解一下 Java 中的阻塞队列:
指点
2019/01/18
4210
Java 多线程(6)----线程池(上)
Ceph 上容器之前的思考
首先必须了解Ceph里面的MON、OSD、MDS、MGR、RGW各种服务的软硬件需求,知道你规划的Ceph规模是多大,当前分配给对应容器的资源是否合适,不然到了后期你需要做各种硬件资源调整而不得不重启容器的时候,你的服务可用性会可能会大打折扣。总之就是一句话,硬件资源一步到位,不要瞎折腾。别让OOM成为常态!
用户1260683
2018/12/24
1.5K0
WebAssembly 出来之前发生了什么
当时的互联网也不太发达,应用也没那么复杂,所以很多浏览器厂商还没考虑到js执行的效率问题。
zz_jesse
2020/11/06
9210
BackgroundWorker在单独的线程上执行操作
直接使用多线程有时候会带来莫名其妙的错误,不定时的发生,有时候会让程序直接崩溃,其实BackgroundWorker 类允许您在单独的专用线程上运行操作。可以通过编程方式创建 BackgroundWorker,也可以将它从“工具箱”的“组件”选项卡中拖到窗体上。如果在 Windows 窗体设计器中创建 BackgroundWorker,则它会出现在组件栏中,而且它的属性会显示在“属性”窗口中。
张果
2022/05/09
1.3K0
BackgroundWorker在单独的线程上执行操作
Java 线程同步方式 wait/notify(两个线程交替执行的例子)
线程同步,就是线程之间互相协调,通过等待,唤醒等操作,避免线程之间同时操作同一个资源。简单的说就是线程之间互相协作,避免干扰了其他线程的工作。
水货程序员
2018/11/13
4.6K0
Java 多线程编程(上)
https://blog.csdn.net/weixin_44510615/article/details/102617286
润森
2019/11/04
4570
Linux上的的Java线程同步机制
现如今,一个服务端应用程序几乎都会使用到多线程来提升服务性能,而目前服务端还是以linux系统为主。一个多线程的java应用,不管使用了什么样的同步机制,最终都要用JVM执行同步处理,而JVM本身也是linux上的一个进程,那么java应用的线程同步机制,可以说是对操作系统层面的同步机制的上层封装。这里我说的操作系统,主要是的非实时抢占式内核(non-PREEMPT_RT),并不讨论实时抢占式内核(PREEMPT_RT) 的问题,二者由于使用场景不同,因此同步机制也会存在差异或出现变化。
TimGallin
2021/12/03
6390
ArrayList在Java多线程中的应用
开发中,存在这样的业务逻辑,类似倒金字塔结构,下层数据需要基于上层的数据进行逻辑计算。设计思路是:定义一个全局变量upLayerList,来保存上一层的数据。每一层计算仅需要知道upLayerList就可以,不用关心上一层数据怎么获取。当前层计算完毕后,把结果赋值给upLayerList,留给下一层使用。
用户2146693
2019/08/08
1.6K0
ArrayList在Java多线程中的应用
Confluence 6 在升级之前
在这个指南中,我们将会与你一同对最新的 Confluence 站点在 Windows 或者 Linux 平台进行安装和更新。
HoneyMoose
2019/04/20
6400
Confluence 6 在升级之前
Java的先行发生原则
先行发生原则(Happens-Before)是Java内存模型定义的一个等效判断原则。依赖这个原则,我们可以通过几条简单规则判断数据是否存在竞争,线程是否安全,而不需要陷入Java内存模型苦涩难懂的定义之中。
真正的飞鱼
2023/06/05
3140
最近线上发生的两个坑爹锅!
最近由于在技改,发生了不少问题,前文中说的缓存穿透只是其中之一,想了想,虽然都是比较简单的问题,但是应该实际中还是有不少人碰到过,这些问题看似很简单,但是你绝对应该踩过。
艾小仙
2021/01/14
3000
最近线上发生的两个坑爹锅!
点击加载更多

相似问题

Java在线程启动之前发生

31

内存一致性-在Java中的关系之前发生

42

Java内存模型发生-在线程池交互的保证之前

22

Java发生-在关系invokeAndWait之前

14

Java会发生-在关系之前?

15
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档