首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Akka(26): Stream:异常处理-Exception handling

akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式。...Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf)) attempts代表发生异常过程中尝试恢复次数...,0代表不尝试恢复,直接异常中断。...、清除任何内部状态 akka-stream的默认异常处理方式是Stop,即立即终止数据流,返回异常。...从下面的运算结果中我们确定了Restart在重启过程中清除了内部状态,也就是说从发生异常的位置开始重新进行计算了: 0 1 4 0 5 12 好了,下面是这次示范涉及的完整源代码: import akka.actor

1.3K80
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    alpakka-kafka(1)-producer

    或者从另外一个角度讲:alpakka-kafka就是一个用akka-streams实现kafka功能的scala开发工具。...alpakka-kafka提供了kafka的核心功能:producer、consumer,分别负责把akka-streams里的数据写入kafka及从kafka中读出数据并输入到akka-streams...用akka-streams集成kafka的应用场景通常出现在业务集成方面:在一项业务A中产生一些业务操作指令写入kafka,然后通过kafka把指令传送给另一项业务B,业务B从kafka中获取操作指令并进行相应的业务操作...另一头库存管理从kafka中读取收货记录并更新相关库存数量记录。注意,这两项业务是分别操作的。...在alpakka中,实际的业务操作基本就是在akka-streams里的数据处理(transform),其实是典型的CQRS模式:读写两方互不关联,写时不管受众是谁,如何使用、读者不关心谁是写方。

    97820

    akka-streams - 从应用角度学习:basic stream parts

    实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是从了解akka-streams的原理为出发点。...因为akka-streams是akka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。...这段时间所遇到的一些需求也是通过集合来解决的。不过,现在所处的环境还是逼迫着去真正了解akka-streams的应用场景。...所以流处理应该是分布式数据处理的理想方式了。这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。...用基础流组件Source,Flow,Sink构成的流是直线型的。也就是说从Source流出的元素会一个不漏的经过Flow进入Sink,不能多也不能少。

    1.1K10

    Java编码指南:Java 8 Lambda-Streams中的异常如何优雅处理

    ---- 现象 ---- Java 8 Lambda-Streams让我们一步迈入了函数式编程的世界,使用它可以写出更简洁、更灵活的代码。...但是Java 8 Lambda-Streams遇到异常时,会终止后续程序运行,而且当我们碰到受检异常时,我们不得不try、catch处理,这样会破坏函数式编程的可阅读性和美观度。...,后续流程不再执行): 优雅处理Java 8 Lambda-Streams中的异常 ---- 当Java 8 Lambda-Streams中抛出受检异常必须处理或者我们批处理任务,不受单个业务的失败而继续执行时...当然我们有很多自己处理异常的方式,详细可参考:https://javadevcentral.com/throw-checked-exceptions-in-java-streams。...,返回默认值 System.out.println(aTry); } } 运行结果: 小结 ---- Java 8 新增的Lambda-Streams遇到异常的情况,目前

    37020

    Akka(21): Stream:实时操控:人为中断-KillSwitch

    akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。...任何时候如果需要终止运行中的数据流就必须采用一种任务柄(handler)方式来控制在其它线程内运行的任务。这个handler可以在提交运算任务时获取。...这可以从KillSwitch的构建器代码里可以看得到: object KillSwitches { /** * Creates a new [[SharedKillSwitch]] with...实例就像immutable对象,我们可以在多个数据流中插入SharedKillSwitch,然后用这一个共享的handler去终止使用了这个SharedKillSwitch的数据流运算。...下面是本次示范的源代码: import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration

    83760

    Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。...一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sink。...我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下: Source(1 to 10).runWith(Sink.foreach(println)) 从另一个角度说明...这个API中的函数包括下面这些: 1、emit(out,elem):临时替换OutHandler,向端口发送elem,然后再恢复OutHandler 2、emitMultiple(out,Iterable...:临时替换OutHandler,向端口发送一串数据,然后再恢复OutHandler 3、read(in)(andThen):临时替换InHandler,从端口读取一个数据元素,然后再恢复InHandler

    1.7K80

    Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2

    所以我们只能从小众心态来探讨如何改善Slick现状,希望通过与某些Stream库集成,在Slick FRM的基础上恢复一些人们熟悉的Recordset数据库光标(cursor)操作方式,希望如此可以降低...刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。...Slick和Akka-Stream可以说是自然匹配的一对,它们都是同一个公司产品,都支持Reactive-Specification。...现在我们有了Reactive stream source,它是个akka-stream,该如何对接处于下游的scalaz-stream-fs2呢?...enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现: class FS2Gate[T](q: fs2.async.mutable.Queue

    87350

    Flink Data Source

    需要注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serializable ,否则会抛出序列化失败的异常: import java.io.Serializable;...,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中。...,即不支持在得到的 DataStream 上调用 setParallelism(n) 方法,此时会抛出如下的异常: Exception in thread "main" java.lang.IllegalArgumentException...当前内置连接器的支持情况如下: Apache Kafka (支持 source 和 sink) Apache Cassandra (sink) Amazon Kinesis Streams (source...) Apache Flume (sink) Redis (sink) Akka (sink) Netty (source) 随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况

    1.1K20

    akka-grpc - 基于akka-http和akka-streams的scala gRPC开发工具

    在http/1应用中对二进制文件的传输交换有诸多限制和不便,特别是效率方面的问题。在protobuf这种序列化模式中对任何类型的数据格式都一视同仁,可以很方便的实现图片等文件的上传下载。...至于akka-grpc基于akka-streams的特性,我并没有感到太大的兴趣。如上所述,我们的目标是实现一种开放数据平台的终端接入接口。...akka-streams恰恰把总体方案限制在了内部系统集成模式,因为服务端客户端就是akka-streams的两端,是内部系统集成的场景。...也许,在开发一套内部IT系统的过程中akka-grpc可以很趁手。...上面提到,虽然http/2推出已经不短时间了,但尚未得到普及性的认可。即使是低点版本的websocket,也只能在一小撮专业的应用中得到使用。

    2K20

    面向流的设计思想

    只要规划好我们的流程,思考组成这些流程的步骤的输入和输出,就可以分别将这些步骤分别建模为Source、Sink、Flow以及Fan-in、Fan-out和BidiFlow,如下图所示: ?...) 获得这些交易后对交易进行验证 验证后的数据分别用于用于审计和计算净值 我们对该流程进行领域建模时,实则可以绘制一个可以表达Akka Streams中Graph的可视化图: ?...通过这样的可视化图,我们就可以针对这些图中的节点建模为Akka Streams中的Graph Shape。...除了入口的accountNos是Source,以及用于最后的审计与净值计算作为Sink外,其余节点都是Flow类型。...例如代码中的~>符号非常清晰地表达出了数据流动的方向,流经什么样的节点。

    1.6K30

    异步编程 - 14 异步、分布式、基于消息驱动的框架 Akka

    它提供了监督策略,允许在 Actor 发生故障时采取自定义的恢复操作。这有助于系统在故障时继续运行,提高了系统的可用性。...插件和扩展:Akka 提供了丰富的插件和扩展机制,可以轻松集成其他库和框架,如 Akka HTTP、Akka Streams 等,以构建全栈应用程序。...下面看下Akka的特性: 可以更简单地构建并发和分布式系统 Akka基于Actor模型和Streams,让我们可以构建可伸缩的,并且可以高效使用服务器资源,使用多个服务器进行扩展的系统。...对调用堆栈的误解 传统的调用堆栈模型不适用于并发编程,因为异步任务无法通过调用堆栈传递异常或通知主线程。 异步任务执行失败时,任务状态可能丢失,需要引入新的错误信令机制以及从故障中恢复的方法。...Actor模型中采用树状层次结构的监督机制,父Actor可以对子Actor的故障进行监控和处理。 监督程序可以决定是否重新启动子Actor或停止子Actor,确保系统的可恢复性和健壮性。

    1.4K40

    计算机程序的思维逻辑 (6) - 如何从乱码中恢复 (上)?

    本节主要介绍各种编码,乱码产生的原因,以及简单乱码的恢复。下节我们介绍复杂乱码的恢复,以及Java中对字符和文本的处理。...在四字节编码中,第一个字节的值从0x81到0xFE,第二个字节的值从0x30到0x39,第三个字节的值从0x81到0xFE,第四个字节的值从0x30到0x39。...解析二进制时,如何知道是两个字节还是四个字节表示一个字符呢?看第二个字节的范围,如果是0x30到0x39就是四个字节表示,因为两个字节编码中第二字节都比这个大。...首先将其看做整数,转化为二进制形式(去掉高位的0),然后将二进制位从右向左依次填入到对应的二进制格式x中,填完后,如果对应的二进制格式还有没填的x,则设为0。...这种情况下,无论怎么切换查看编码方式,都是不行的。 那有没有办法恢复呢?如果有,怎么恢复呢?

    1.3K50

    Akka(25): Stream:对接外部系统-Integration

    在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。...akka-stream提供了mapAsync+ask模式可以从一个运算中的数据流向外连接某个Actor来进行数据交换。这是一种akka-stream与Actor集成的应用。...说到与Actor集成,联想到如果能把akka-stream中复杂又消耗资源的运算任务交付给Actor,那么我们就可以充分利用actor模式的routing,cluster,supervison等等特殊功能来实现分布式高效安全的运算...() sys.terminate() } 在这个例子里parallelism=3,我们在StorageActor里把当前运算中的实例返回并显示出来: akka://demoSys/user/dbWriter...那么所谓的并行运算parallelism=3的意思就只能代表在多个Future线程中同时运算了。为了实现对Actor模式特点的充分利用,我们可以通过router来实现在多个actor上并行运算。

    2.1K80

    Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

    akka-stream原则上是一种推式(push-model)的数据流。...对于akka-stream这种push模式的数据流,因为超速推送数据会造成数据丢失,所以必须想办法控制publisher产生数据的速度。...因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速...akka-stream可以通过以下几种方式来设定异步运算使用的缓冲大小: 1、在配置文件中设定默认buffer: akka.stream.materializer.max-input-buffer-size...如果下游能及时读取则Seq(Item)中的Item正是上游推送的数据元素,否则Seq(i1,i2,i3...)就代表上游在下游再次读取时间段内产生的数据。

    89270

    计算机程序的思维逻辑 (7) - 如何从乱码中恢复 (下)?

    乱码恢复 "乱"主要是因为发生了一次错误的编码转换,恢复是要恢复两个关键信息,一个是原来的二进制编码方式A,另一个是错误解读的编码方式B。...这四种编码是常见编码,在大部分实际应用中应该够了,但如果你的情况有其他编码,可以增加一些尝试。 不是所有的乱码形式都是可以恢复的,如果形式中有很多不能识别的字符如�?...,则很难恢复,另外,如果乱码是由于进行了多次解析和转换错误造成的,也很难恢复。...接下来,是时候看看在Java中如何表示和处理字符了,我们知道Java中用char类型表示一个字符,但在第三节我们提到了一个问题,即"字符类型怎么也可以进行算术运算和比较?"。...我们需要对Java中的字符类型有一个更为清晰和深刻的理解。

    1.1K80

    2022年最新版 | Flink经典线上问题小盘点

    下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。 如果是第一种状况,那么该节点则为反压的根源节点,它是从 Source Task 到Sink Task 的第一个出现反压的节点。...该异常在 Flink AM 向 YARN NM 申请启动 token 已超时的 Container 时抛出,通常原因是 Flink AM 从 YARN RM 收到这个 Container 很久之后(超过了...值得注意的是,Flink使用RocksDB状态后端也有可能会抛出这个异常,此时需修改flink-conf.yaml中的state.backend.rocksdb.files.open参数,如果不限制,可以改为...的schema,恢复作业时会抛出此异常,表示不支持更改schema。...,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

    4.7K30

    如何从失焦的图像中恢复景深并将图像变清晰?

    是的,我们今天就来看看另外一种图像模糊——即失焦导致的图像模糊——应该怎么样处理。 我今天将要介绍的技术,不仅能够从单张图像中同时获取到全焦图像(全焦图像的定义请参考33....之前介绍的模糊对画面中的每个点都是均匀的,即每个像素对应的PSF都一致。而现在这种由于失焦带来的模糊则是对画面中每个点都不一致的,这是它们的第一个不同。...那么,如何解决上面这两个问题呢?我们现在才进入今天文章的核心?...2.3 完整的过程 有了前面所讲的两点作为基础,作者就进一步解释了如何来获取全焦图像。 提前标定好不同尺度的编码光圈卷积核 ? 对每个像素i,选择一个局部窗口 ? ,对应的图像为 ?...因此,不管是从肉眼上观察,还是通过振铃效应导致的过大的卷积误差,我们都很容易判断哪个是正确尺度的卷积核。

    3.5K30
    领券