首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何突然停止akka流的Runnable Graph?

在Akka流中,要突然停止一个Runnable Graph,可以使用killSwitch来实现。killSwitch是一个特殊的流操作符,可以用于控制流的启动和停止。

下面是停止Akka流的Runnable Graph的步骤:

  1. 首先,创建一个KillSwitch对象,可以使用KillSwitches类的静态方法来创建。例如,可以使用KillSwitches.single()创建一个单个的KillSwitch对象。
  2. KillSwitch对象与要停止的流进行关联。可以使用viaMat操作符将KillSwitch对象添加到流中。例如,可以使用Flow.viaMat(killSwitch.flow, Keep.right)KillSwitch对象添加到流中,并保留原始流的引用。
  3. 在需要停止流的地方,调用shutdown()方法来停止流。例如,可以在某个条件满足时调用killSwitch.shutdown()来停止流。

下面是一个示例代码,演示如何使用KillSwitch来停止Akka流的Runnable Graph:

代码语言:scala
复制
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.KillSwitches
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

object StopAkkaStreamExample extends App {
  implicit val system: ActorSystem = ActorSystem("StopAkkaStreamExample")
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  // 创建一个KillSwitch对象
  val killSwitch = KillSwitches.single[Int]

  // 创建一个简单的流,每秒生成一个递增的数字
  val source = Source.tick(0, 1, 0)
    .viaMat(killSwitch.flow, Keep.right) // 将KillSwitch对象添加到流中
    .map { i =>
      println(i)
      i
    }

  // 创建一个Sink,用于消费流中的元素
  val sink = Sink.ignore

  // 运行流,并保留原始流的引用
  val graph = source.to(sink).run()

  // 模拟停止流的条件
  Thread.sleep(5000)

  // 停止流
  killSwitch.shutdown()

  // 等待流完成
  graph.onComplete(_ => system.terminate())
}

在上面的示例中,我们创建了一个简单的流,每秒生成一个递增的数字,并通过KillSwitch对象控制流的启动和停止。在模拟停止流的条件满足后,调用killSwitch.shutdown()方法停止流的执行。

请注意,上述示例中的代码是使用Scala编写的,但是Akka流也支持Java编程语言。你可以根据自己的需求选择适合的编程语言来实现相同的功能。

推荐的腾讯云相关产品:腾讯云云服务器(CVM)和腾讯云云数据库MySQL。这些产品提供了可靠的云计算基础设施和数据库服务,适用于各种规模的应用场景。你可以在腾讯云官网上找到更多关于这些产品的详细信息和介绍。

腾讯云云服务器(CVM)产品介绍链接:https://cloud.tencent.com/product/cvm

腾讯云云数据库MySQL产品介绍链接:https://cloud.tencent.com/product/cdb_mysql

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Akka(19): Stream:组合数据,组合共用-Graph modular composition

akka-streamGraph是一种运算方案,它可能代表某种简单线性数据图如:Source/Flow/Sink,也可能是由更基础图组合而成相对复杂点某种复合流图,而这个复合流图本身又可以被当作组件来组合更大...因为Graph只是对数据运算描述,所以它是可以被重复利用。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高功能层面上实现Graph模块化(modular)。...下面是akka-stream预设一些基础数据图: ? 上面Source,Sink,Flow代表具备线性步骤linear-stage图,属于最基础组件,可以用来构建数据处理链条。...b.addEdge(importAndGetPort(b), to) 以上过程显示:通过akkaGraphDSL,对复合型Graph构建可以实现形象化,大部分工作都在如何对组件之间端口进行连接...akka-stream运算是在actor上进行,除了大家都能对数据元素进行处理之外,akka-stream还可以通过actor内部状态来维护和返回运算结果。

1.1K100
  • Akka(17): Stream:数据基础组件-Source,Flow,Sink简介

    2、scalaz-sstream和akka-stream数据都是一种申明式数据处理流程描述,属于一种运算方案,最终都需要某种运算器来对数据按运算方案进行具体运算,得出运算结果和产生副作用。...akka-stream数据是由三类基础组件组合而成,不同组合方式代表不同数据处理及表达功能。三类组件分别是: 1、Source:数据源。...对通过输入端口输入数据元素进行转变处理(transform)后经过输出端口输出。FlowShape有一个输入端和一个输出端。 在akka-stream里数据组件一般被称为数据图(graph)。...我们可以用许多数据图组成更大stream-graphakka-stream最简单完整(或者闭合)线性数据(linear-stream)就是直接把一个Source和一个Sink相接。...意思是选择左边数据运算结果。我们上面提过akka-stream是在actor系统里处理数据元素。在这个过程中同时可以用actor内部状态来产生运算结果。

    1.6K60

    Akka(21): Stream:实时操控:人为中断-KillSwitch

    akka-stream是多线程non-blocking模式,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。...任何时候如果需要终止运行中数据就必须采用一种任务柄(handler)方式来控制在其它线程内运行任务。这个handler可以在提交运算任务时获取。...T1, T2, T2], UniqueKillSwitch]] ...} akka-stream提供了single,shared,singleBidi三种KillSwitch构建方式,它们形状都是FlowShape...对象,我们可以在多个数据中插入SharedKillSwitch,然后用这一个共享handler去终止使用了这个SharedKillSwitch数据运算。...下面是本次示范源代码: import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration

    81860

    Akka(23): Stream:自定义构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据终点Sink三个框架性构件(stream components)组成。...一个完整数据(可运行数据)必须是一个闭合数据,即:从外表上看,数据两头必须连接一个Source和一个Sink。...:akka-stream又包括数据Graph及运算器Materializer两个部分。...所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能Graph组成。...一个可运行数据必须由一个闭合数据图(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能子图(sub-graph)组成。

    1.7K80

    akka-streams - 从应用角度学习:basic stream parts

    实际上很早就写了一系列关于akka-streams博客。但那个时候纯粹是为了了解akka而去学习,主要是从了解akka-streams原理为出发点。...因为akka-streams是akka系列工具基础,如:akka-http, persistence-query等都是基于akka-streams,其实没有真正把akka-streams用起来。...所以处理应该是分布式数据处理理想方式了。这是这次写akka-streams初衷:希望能通过akka-streams来实现分布式数据处理编程。...虽然运算值不能像元素一样流动,但akka-streams提供了机制让用户选择是否返回某个节点运算值M。...akka-streams提供了简便一点运算方式runWith:指定runWith参数组件M为最终运算值。

    1.1K10

    Akka(24): Stream:从外部系统控制数据-control live stream from external system

    在数据应用现实场景中常常会遇到与外界系统对接需求。这些外部系统可能是Actor系统又或者是一些其它类型系统。...与这些外界系统对接意思是在另一个线程中运行数据可以接收外部系统推送事件及做出行为改变响应。...akka-stream是多线程异步模式程序,所以这个函数只能是一个异步运行回调callback。...插入了一个正在运行中数据中并在最后终止了这个数据。 另外,一个GraphStage也可以被外界当作一种Actor来进行交流。...下面是本次示范源代码: GetAsyncCallBack.scala import akka.actor._ import akka.stream._ import akka.stream.scaladsl

    686100

    响应式编程实践

    理解Source本质 Akka Stream将数据源定义为Source,RxJava则定义为Observable或Flowable。这些响应式编程框架都为Source提供了丰富operator。...Akka Stream拓扑图 Akka Stream对流处理抽象被建模为图。这一设计思想使得处理变得更加直观,处理变成了“搭积木”游戏。...我们可以将Akka StreamGraph(完整Graph,称为ClosedShape,是可以运行,又称之为RunnableShape)看做是处理”模具“,至于那些由Inlet与Outlet端口组成基础...一旦处理模具打造完毕,打开数据”水龙头“,让数据源源不断地流入Graph中,处理就可以”自动“运行。只要Source没有发出complete或error信号,它就将一直运行下去。...Akka Stream之所以将Graph运行器称之为materializer,大约也是源于这样隐喻吧。 使用Akka Stream进行响应式处理,我建议参考这样思维。

    1.4K80

    面向设计思想

    这带来设计思想上根本变化,包括: 以作为建模元素 存在松耦合上下游关系 以为重用单位 对流进行转换、运算、合并与拆分 在Rx框架中,一个就是一个Observable或者Flowable。...例如我们要统计网页字数,则源头就是对网页内容获取,而就是Observable类型网页内容。...无论哪个发射了数据,它都会将这两个最近发射数据组合起来,并按照指定函数进行运算。 Akka Stream提出来Graph更能体现作为建模元素思想。...) 获得这些交易后对交易进行验证 验证后数据分别用于用于审计和计算净值 我们对该流程进行领域建模时,实则可以绘制一个可以表达Akka Streams中Graph可视化图: ?...通过这样可视化图,我们就可以针对这些图中节点建模为Akka Streams中Graph Shape。

    1.6K30

    你有必要了解一下Flink底层RPC使用框架和原理

    Akka介绍 由于Flink底层Rpc是基于Akka实现,我们先了解下Akka基本使用。 Akka是一个开发并发、容错和可伸缩应用框架。...创建Akka系统 Akka系统核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka...如下代码展示了如何配置一个Akka系统。 // 1....; 延迟/立刻调度Runnable、Callable; 停止RpcServer(Actor)或自身服务; 在Flink中其实现类为AkkaRpcService。...(注意此时Actor处于停止状态)和动态代理对象,需要调用RpcEndpoint#start启动启动Actor,此时启动RpcEndpoint流程如下(以非FencedRpcEndpoint为例):

    2.3K30

    Akka(26): Stream:异常处理-Exception handling

    akka-stream是基于Actor模式,所以也继承了Actor模式“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一异常处理策略和具体实施方式。...在akka-stream官方文件中都有详细说明和示范例子。我们在这篇讨论里也没有什么更好想法和范例,也只能略做一些字面翻译和分析理解事了。...下面列出了akka-stream处理异常一些实用方法: 1、recover:这是一个函数,发出数据最后一个元素然后根据上游发生异常终止当前数据 2、recoverWithRetries:也是个函数...为它们提供“逐步延迟重启策略” 4、Supervision strategy:是数据构件“异常监管策略”属性。...、清除任何内部状态 akka-stream默认异常处理方式是Stop,即立即终止数据,返回异常。

    1.2K80

    Java开发中如何正确停掉线程?

    本篇文章将为您讲解如何正确地停掉线程。 在 Java 中,停掉线程最简单方法就是使用 Thread 类提供 stop() 方法。stop() 方法可以直接停掉一个正在运行线程。...但是,尽管这种方法很简单,但由于进程突然结束可能会引发一些问题,因此不能够滥用这个方法。 除了 stop() 方法外,Java 还提供了一些其他停止线程方法,这些方法需要程序员自己实现。...常见有以下几种: 1、通过设置标志位来停止线程 这是一种通用停止线程方式。我们可以在程序中定义一个布尔型变量,用来表示线程是否需要继续执行。...在需要停止线程时,我们可以调用这个对象 notifyAll() 方法来通知所有线程停止运行。...2、确保正确地释放资源,关闭等操作,避免资源泄漏。 3、不要在 stop() 方法中执行过多操作,否则容易导致死锁、阻塞等问题。 总之,正确地停掉一个线程并没有一个“万能”方法。

    15310

    为什么用 Java:一个 Python 程序员告诉你

    晋级一开发环境先贤努力成果。...用最新Java来编程 现在你心情可能已经从恶心变成好奇了,那么我们在2015年该如何写Java呢?从哪儿开始呢?首先,让我们回顾一些在Java 7和Java 8涌现核心语言概念。...// Lambda Runnable Runnable r2 = () -> System.out.println("Hello world two!")... Java 8引入了(stream)概念,这为Java提供了很多现代函数式语言特性。是一种对集合上一系列转换延迟执行机制。比如我们来数一下以’A’开头名字。...分布式系统 Akka 提供类似Erlang型Actor模型抽象层来编写分布式系统。Akka可以从容应对许多种不同故障,为编写可靠分布式系统提供了更高层次抽象。

    1.1K90

    Akka 指南 之「什么是 Actor?」

    关于「Actor Systems」前一节解释了 Actor 如何形成层次结构,以及在构建应用程序时是最小单元。本节将孤立地研究一个这样 Actor,解释在实现它时遇到概念。...由于该策略是如何构建 Actor 系统基础,因此一旦创建了 Actor,就不能更改它。...当 Actor 终止 一旦一个 Actor 终止,即以一种不被重启处理方式失败、自行停止或被其监督者停止,它将释放其资源,将其邮箱中所有剩余邮件排入系统“死信邮箱(dead letter mailbox...)”,该邮箱将它们作为死信(DeadLetters)转发到事件(EventStream)。...然后在 Actor 引用中用系统邮箱替换原 Actor 邮箱,将所有新消息作为死信重定向到事件。但是,这是在尽最大努力基础上完成,因此不要依赖它来构建“有保证交付”。

    90820

    【你问我答】这些Java并发问题,专家是这么回答

    ——Absurd “ 答: 只简单用过AkkaAkka cluster没用过。 1. actor里面最好不要有阻塞操作,如果有的话一定要设置下dispatcher。 2....” 六 ---- 实际工作当中不太接触并发知识,但是面试时候问最多是并发,如何破解? ——凌亂 “ 答: 面试过程中更多是对知识考察,并没有太多深入实际使用,所以只需要去学习就OK了。...提交任务: ① 无返回值任务使用execute(Runnable) ② 有返回值任务使用submit(Runnable) 关闭线程池: 调用shutdown或者shutdownNow,两者都不会接受新任务...,而且通过调用要停止线程interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完...在并发世界中,有一件顶重要事情就是执行协调,在Java世界中就是线程间协调和通信,基本包括这几种:原子性管理、线程阻塞和解除阻塞以及排队,这些功能统称同步器。

    97090

    Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

    akka-stream原则上是一种推式(push-model)数据。...对于akka-stream这种push模式数据,因为超速推送数据会造成数据丢失,所以必须想办法控制publisher产生数据速度。...另外,如果用async进行数据并行运算的话上游就不必理会下游反应,可以把数据推进buffer然后立即继续处理下一个数据元素。所以async运算模式buffering就不可或缺了。...对此akka-stream提供了具体解决方法:如果外界系统是在上游过快产生数据可以用conflate函数用Seq这样集合把数据传到下游。...GraphDSL.create() { implicit b => import GraphDSL.Implicits._ // this is the asynchronous stage in this graph

    87370

    Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub

    在现实中我们会经常遇到这样场景:有一个固定数据源Source,我们希望按照程序运行状态来接驳任意数量下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某个固定数据终端...从akka-stream技术文档得知:一对多,多对一或多对多类型复杂数据组件必须用GraphDSL来设计,产生Graph类型结果。...前面我们提到过:Graph就是一种运算预案,要求所有的运算环节都必须是预先明确指定,如此应该是无法实现动态管道连接。...但akka-stream提供了MergeHub,BroadcastHub和PartitionHub来支持这样功能需求。 1、MergeHub:多对一合并类型。...下面是以上示范中MergeHub及BroadcastHub示范源代码: import akka.NotUsed import akka.stream.scaladsl._ import akka.stream

    93380

    【JavaEE初阶】多线程(一)

    在同一个进程里PCB之间,PID是一样。内存指针和文件描述符表也是一样。 再举个例子~ 什么时候会出现这种安全问题呢? 多个执行访问同一个共享资源时候。...: start是真正创建了一个线程,线程是独立执行。...可以使用jdk自带工具jconsole查看当前java进程中所有线程。 PCB不是“简称”是一个数据结构,体现是进程/线程是如何实现,如何被描述。...调用start方法,才是真的在操作系统底层创建出一个线程。 线程终止 线程终止,不是让线程立即停止。而是通知线程要停止了。但是至于线程是否停止了,取决于代码具体写法。...举个列子: 我在打游戏,麻麻突然告诉我家里酱油没了。让我去打酱油~我有以下几个选择: 停止游戏,立即去 打完这把,再去 假装没听见,不去 类比如下: 使用标志位来控制线程是否要停止

    21710

    Akka 指南 之「第 1 部分: Actor 体系结构」

    接下来,我们来考虑一下,这些知识如何帮助我们编写更好代码。 Actor 生命周期 Actor 在被创建时就会出现,然后在用户请求时被停止。...从技术上讲,通过调用getContext().stop(actorRef)是可以停止另一个 Actor ,但通过这种方式停止任意 Actor 被认为是一种糟糕做法:停止 Actor 一个比较好方法是...你可以尝试重写这些附加方法,并查看输出是如何变化。 对于没有耐心的人,我们还是建议查看「监督参考页」,了解更深入细节。...总结 我们已经了解了 Akka如何管理层级结构中 Actor ,在层级结构中,父 Actor 会监督他们子 Actor 并处理异常情况。...我们看到了如何创造一个非常简单 Actor 和其子 Actor。接下来,我们将会把这些知识应该到我们示例中,获取设备 Actor 信息。稍后,我们将讨论如何管理小组中 Actor。

    99120
    领券