这篇文章是我介绍Eclipse Vert.x系列的第五篇文章。在上一篇文章中,我们看到了Vert.x如何与数据库交互。我们使用Future
对象来驯服Vert.x的异步特性。在这篇文章中,我们将看到另一种管理异步代码的方式:反应式编程。我们将看到Vert.x如何与Reactive eXtensions结合来为您提供巨大的能量。
让我们先用以前的帖子刷新我们的记忆:
vertx-web
,并开发了一个集合管理应用程序。此应用程序公开了HTML / JavaScript前端可调用的REST API。Future
编排我们的异步操作。在这篇文章中,我们不会添加新功能。相反,我们将探索另一种编程模式:反应式编程。
这篇文章的代码可以在GitHub仓库的post-5
目录中找到。
请忘记你对代码的所有认知并抬头看看。用代码来建模这个世界是极具挑战的。作为开发人员,我们倾向于使用反直觉方法。自20世纪80年代以来,面向对象计算被视为高招。来自我们世界的每个实体都由一个包含字段和曝光法的对象来表示。大多数情况下,我们使用阻塞和同步协议完成与这些对象的交互。调用一个方法并等待响应。但是......我们生活的世界是异步的。交互是通过使用事件,消息和刺激来完成的。为了克服面向对象的局限性,出现了许多模式和范例。最近,函数式编程正在卷土重来,但它不是为了取代面向对象编程,而是为了补充它。反应式编程 是一种功能性的事件驱动的编程方法,与常规的面向对象的范例结合使用。
几年前,微软创建了一个名为Reactive eXtensions(也称为ReactiveX或RX)的.NET反应式编程框架。RX是一个用可观察流进行异步编程的API 。该API已被移植到多种语言,如JavaScript,Python,C ++和Java。
让我们停下来静静地观察我们的世界。观察运动中的实体:交通拥堵,天气,谈话和金融市场。事情在并发演化着。多件事情同时发生,有时是独立的,有时是以精心安排的方式发生。每个对象创建流活动。例如,您的鼠标光标位置正在移动。位置序列是一个流。房间里的人数可能是固定的,但有人会进进出出,产生新的价值。所以我们有另一个价值流。反应式编程的背后有一个基本的原则:事件即是数据,数据即是事件。
关于RX和异步编程的重要理解是流的异步特性。您会观察到一个流,并在流发出某个项目时通知您。你不知道什么时候会发生,但你正在观察。这个观察是通过一个操作完成的。subscribe
RxJava是Java编程语言RX的直接实现。它是用于Java中的反应式编程的非常流行的库,具有联网数据处理应用程序和JavaFX和Android的图形用户界面。RxJava是Java中反应式库的通用语言,它提供以下五种类型来描述发布者:
流中的项目数 | RxJava 2种类型 | RX签名 | 回调签名 | 未来的签名 | |
---|---|---|---|---|---|
通知,数据流 | 0..N | 可观察,可流动 | Observable stream()可流式流() | ReadStream方法() | N / A |
异步操作产生结果 | 1 | 单 | Single get() | void get(Handler <AsyncResult> handler) | 未来获得() |
异步操作不产生或一个结果 | 0..1 | 也许 | 也许findById(String id) | void get(String id,Handler <AsyncResult> handler) | 未来获得(字符串id) |
异步操作不产生任何结果 | 0 | Completable | 可完全冲洗() | void flush(Handler <AsyncResult> handler) | 未来flush() |
之间的差和是处理背压(实施反应性流协议),而没有。更适合来自支持背压源(例如,TCP连接)的大量数据流,而更适合处理无法应用背压的“热”可观测数据(例如,GUI事件)。ObservableFlowableFlowableObservableFlowableObservable
这篇文章不是反应式编程或RX的介绍。如果您需要关于反应式编程和RX的介绍级课程,请查看本教程。
在之前的文章中,我们曾经撰写过异步操作。在这篇文章中,我们将使用流和RxJava。怎么样?感谢Vert.x和RxJava 2 API。事实上,Vert.x提供了一组接收 API。但是,不要忘记:Future
将它们结合起来可以为您提供超级用户,因为它利用RxJava流和运算符的强大功能将异步执行模型从Vert.x扩展到了Vert.x。
它始终始于Maven依赖项。在你的文件中添加这个:pom.xml
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>${vertx.version}</version>
</dependency>
然后,打开这个类并用这个替换import语句:io.vertx.intro.first.MyFirstVerticle
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.SQLOptions;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.reactivex.ext.web.handler.StaticHandler;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
注意包装。这是Vert.x RX API的实现。因此,我们现在正在扩展,而不是延长。注入的实例提出了以前缀开头的新方法,如或。以前缀为前缀的方法返回RxJava 2类型,如or 。io.vertx.reactivexio.vertx.core.AbstractVerticleio.vertx.reactivex.core.AbstractVerticlevertxrxrxDeployVerticlerxCloserxSingleCompletable
为了受益于RX API并能够使用RX运营商,我们需要使用RX类型。例如,以前我们有这样的:
private Future createHttpServer(JsonObject config,
Router router) {
Future future = Future.future();
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(
config.getInteger("HTTP_PORT", 8080),
res -> future.handle(res.mapEmpty())
);
return future;
}
Future
被映射到RX中,也就是只是表示其完成的流。因此,对于RX,此代码变为以下内容:Completable
private Completable createHttpServer(JsonObject config,
Router router) {
return vertx
.createHttpServer()
.requestHandler(router::accept)
.rxListen(config.getInteger("HTTP_PORT", 8080))
.toCompletable();
}
你发现差异吗?我们使用返回a 的方法。因为我们不需要服务器,所以我们使用该方法将其转换为一个。这是可用的,因为我们使用了rx-ified实例。rxListenSingleCompletabletoCompletablerxListenvertx
现在我们来重写这个方法。正在返回一个。这被翻译成:connectconnectFutureSingle
private Single connect() {
return jdbc.rxGetConnection()
.map(c -> c.setOptions(
new SQLOptions().setAutoGeneratedKeys(true)));
}
该客户端还提供了一个API。返回一个。要启用密钥生成,我们使用该方法。从观察结果中获取结果并使用映射函数对其进行转换。这里我们只是调整选项。jdbcrxrxGetConnectionSinglemapmapSingle
遵循相同的原则,该方法改写如下:insert
private Single<Article> insert(SQLConnection connection,
Article article, boolean closeConnection) {
String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)";
return connection
.rxUpdateWithParams(sql,
new JsonArray().add(article.getTitle()).add(article.getUrl()))
.map(res -> new Article(res.getKeys().getLong(0),
article.getTitle(), article.getUrl()))
.doFinally(() -> {
if (closeConnection) {
connection.close();
}
});
}
在这里,我们使用执行语句。结果转化为一个。注意。当操作完成或失败时调用此方法。在这两种情况下,如果要求,我们关闭连接。INSERTrxUpdateWithParamsArticledoFinally
同样的方法适用于使用该方法的方法:queryrxQuery
private Single<Article> query(SQLConnection connection) {
return connection.rxQuery("SELECT * FROM articles")
.map(rs -> rs.getRows().stream()
.map(Article::new)
.collect(Collectors.toList())
)
.doFinally(connection::close);
}
queryOne
如果搜索到的文章不存在,则需要引发错误:
private Single<Article> queryOne(SQLConnection connection, String id) {
String sql = "SELECT * FROM articles WHERE id = ?";
return connection.rxQueryWithParams(sql,
new JsonArray().add(Integer.valueOf(id))
)
.doFinally(connection::close)
.map(rs -> {
List rows = rs.getRows();
if (rows.size() == 0) {
throw new NoSuchElementException(
"No article with id " + id);
} else {
JsonObject row = rows.get(0);
return new Article(row);
}
});
}
映射器函数抛出的异常被传播到流中。所以观察者可以对它做出反应并恢复。
我们已经看到上面的方法丢弃了结果并仅通知用户成功完成或操作失败。在和方法中,我们需要做几乎相同的事情。我们执行SQL语句,如果我们发现这些语句没有更改行,我们会报告错误。为了实现这一点,我们正在使用。这种方法是家庭的一部分,是一个非常强大的接收运营商。该方法将参数作为函数。为观察流发出的每个项目调用此函数。如果流是a ,那么它将被称为零(错误情况)或一个(操作成功并带有结果)次。与运营商不同,toCompletableSingleupdatedeleteflatMapCompletableflatMapSinglemapflatMap
函数返回一个流。例如,在我们的上下文中,函数被调用并返回a :flatMapCompletableUpdateResultCompletable
private Completable update(SQLConnection connection, String id,
Article article) {
String sql = "UPDATE articles SET title = ?,
url = ? WHERE id = ?";
JsonArray params = new JsonArray().add(article.getTitle())
.add(article.getUrl())
.add(Integer.valueOf(id));
return connection.rxUpdateWithParams(sql, params)
.flatMapCompletable(ur ->
ur.getUpdated() == 0 ?
Completable
.error(new NoSuchElementException(
"No article with id " + id))
: Completable.complete()
)
.doFinally(connection::close);
}
private Completable delete(SQLConnection connection, String id) {
String sql = "DELETE FROM Articles WHERE id = ?";
JsonArray params = new JsonArray().add(Integer.valueOf(id));
return connection.rxUpdateWithParams(sql, params)
.doFinally(connection::close)
.flatMapCompletable(ur ->
ur.getUpdated() == 0 ?
Completable
.error(new NoSuchElementException(
"No article with id " + id))
: Completable.complete()
);
}
在这两种情况下,我们检查更新行的数量,如果为0,则产生失败。所以用户收到成功()或错误()。请注意,此代码也可以使用以前的方法:使用操作符,抛出异常并使用结果放弃。CompletableCompletable.completeCompletable.errormaptoCompletable
显然,我们也可以将a 转换为:CompletableSingle
private Single createTableIfNeeded(
SQLConnection connection) {
return vertx.fileSystem().rxReadFile("tables.sql")
.map(Buffer::toString)
.flatMapCompletable(connection::rxExecute)
.toSingleDefault(connection);
}
rxExecute
返回一个。但在这里我们需要转发。幸运的是,运营商将其转换为发射给定值。CompletableSQLConnectiontoSingleDefaultCompletableSingle
到目前为止,我们正在使用方法并调整结果。但是我们如何处理顺序组合呢?执行第一个操作,然后执行第二个操作并返回第一个操作的结果?这可以使用操作员完成。如上所述,是一个非常强大的运营商。它接受一个函数作为参数,不同的是运营商,这个函数返回一个流(所以,,...)。从观察到的流中为每个项目调用此函数,并将返回的流展平,以便项目序列化为单个流。由于流是异步构造,调用会创建一个顺序组合。我们来看看这个方法。最初的实施如下:rxflatMapflatMapmapSingleMaybeCompletableflatMapcreateSomeDataIfNone
private Future createSomeDataIfNone(
SQLConnection connection) {
Future future = Future.future();
connection.query("SELECT * FROM Articles", select -> {
if (select.failed()) {
future.fail(select.cause());
} else {
if (select.result().getResults().isEmpty()) {
Article article1 = new Article("Fallacies of distributed computing", "<a class="vglnk" href="https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing" rel="nofollow"><span>https</span><span>://</span><span>en</span><span>.</span><span>wikipedia</span><span>.</span><span>org</span><span>/</span><span>wiki</span><span>/</span><span>Fallacies</span><span>_</span><span>of</span><span>_</span><span>distributed</span><span>_</span><span>computing</span></a>");
Article article2 = new Article("Reactive Manifesto",
"<a class="vglnk" href="https://www.reactivemanifesto.org/" rel="nofollow"><span>https</span><span>://</span><span>www</span><span>.</span><span>reactivemanifesto</span><span>.</span><span>org</span><span>/</span></a>");
Future<Article> insertion1 = insert(connection, article1, false);
Future<Article> insertion2 = insert(connection, article2, false);
CompositeFuture.all(insertion1, insertion2)
.setHandler(r -> future.handle(r.map(connection)));
} else {
future.complete(connection);
}
}
});
return future;
}
在这个方法中,我们执行查询并根据结果插入文章。RX的实现如下:
private Single createSomeDataIfNone(
SQLConnection c) {
return c.rxQuery("SELECT * FROM Articles")
.flatMap(rs -> {
if (rs.getResults().isEmpty()) {
Article article1 = new Article("Fallacies of distributed computing",
"<a class="vglnk" href="https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing" rel="nofollow"><span>https</span><span>://</span><span>en</span><span>.</span><span>wikipedia</span><span>.</span><span>org</span><span>/</span><span>wiki</span><span>/</span><span>Fallacies</span><span>_</span><span>of</span><span>_</span><span>distributed</span><span>_</span><span>computing</span></a>");
Article article2 = new Article("Reactive Manifesto",
"<a class="vglnk" href="https://www.reactivemanifesto.org/" rel="nofollow"><span>https</span><span>://</span><span>www</span><span>.</span><span>reactivemanifesto</span><span>.</span><span>org</span><span>/</span></a>");
return Single.zip(
insert(connection, article1, false),
insert(connection, article2, false),
(a1, a2) -> c
);
} else {
return Single.just(c);
}
});
}
首先,我们执行查询。然后,当我们得到结果时,调用传递给该方法的函数,实现顺序组合。您可能想知道错误情况。我们不需要处理它,因为错误会传播到流中,并且最终的观察者会收到它。发生错误时不会调用该函数。flatMap
异步操作可以同时发生。但有时你需要知道他们什么时候完成。这被称为并行组合。该运营商可以让你做到这一点。在,我们插入两篇文章。该操作使用(返回a )完成。该操作者观察的两个给定的事件,并呼吁作为最后一个参数传递时都已经完成了方法。在这种情况下,我们只是转发。zipcreateSomeDataIfNoneinsertSinglezipSingleSQLConnection
我们已经改写了我们大部分的功能,但我们需要调整方法。记住我们需要实现的开始顺序:start
//开始序列:// 1 - 检索配置// | - 2 - 创建JDBC客户端// | - 3 - 连接到数据库(检索连接)// | - 4 - - 5 - 如果需要添加一些数据// | - 6 - 完成后关闭连接// | - 7 - 启动HTTP服务器// | - 9 - 我们完成了!
这个组合可以使用运算符来实现:flatMap
retriever.rxGetConfig()
.doOnSuccess(config ->
jdbc = JDBCClient.createShared(vertx, config,
"My-Reading-List"))
.flatMap(config ->
connect()
.flatMap(connection ->
this.createTableIfNeeded(connection)
.flatMap(this::createSomeDataIfNone)
.doAfterTerminate(connection::close)
)
.map(x -> config)
)
.flatMapCompletable(c -> createHttpServer(c, router))
.subscribe(CompletableHelper.toObserver(fut));
这是一个动作操作符,它从观察到的流中接收项目并让您实现副作用。在这里我们分配字段。doOnSuccessjdbc
然后,我们只是使用操作员编排我们不同的操作。看看。这个操作符让我们在完整流被使用时关闭连接,这对于清理非常有用。flatMapdoAfterTerminate
这个代码中有一个重要的部分。到目前为止,我们返回了RX类型,但从未调用过。如果您不订阅,则不会发生任何事情:流是懒惰的。所以不要忘记订阅。订阅实现了管道并触发排放。在我们的代码中,它会触发启动序列。传递给方法的参数只是报告传递给方法的对象的失败和成功。基本上,它将a映射到a 。subscribesubscribeFuturestartFutureSubscriber
我们差不多完成了。我们只需要更新我们的HTTP动作,即HTTP请求所调用的方法。为了简化代码,我们来修改这个类。这个类提供返回的方法。但是这种类型对于需要用户的RX API来说并不是很好。让我们用返回更适合类型的方法替换这些方法:ActionHelperHandler<AsyncResult>
private static BiConsumer writeJsonResponse(
RoutingContext context, int status) {
return (res, err) -> {
if (err != null) {
if (err instanceof NoSuchElementException) {
context.response().setStatusCode(404)
.end(err.getMessage());
} else {
context.fail(err);
}
} else {
context.response().setStatusCode(status)
.putHeader("content-type",
"application/json; charset=utf-8")
.end(Json.encodePrettily(res));
}
};
}
static BiConsumer; ok(RoutingContext rc) {
return writeJsonResponse(rc, 200);
}
static BiConsumer created(RoutingContext rc) {
return writeJsonResponse(rc, 201);
}
static Action noContent(RoutingContext rc) {
return () -> rc.response().setStatusCode(204).end();
}
static Consumer onError(RoutingContext rc) {
return err -> {
if (err instanceof NoSuchElementException) {
rc.response().setStatusCode(404)
.end(err.getMessage());
} else {
rc.fail(err);
}
};
}
现在我们准备实施我们的HTTP操作方法。回到课堂:用这个替换动作方法:MyFirstVerticle
private void getAll(RoutingContext rc) {
connect()
.flatMap(this::query)
.subscribe(ok(rc));
}
private void addOne(RoutingContext rc) {
Article article = rc.getBodyAsJson()
.mapTo(Article.class);
connect()
.flatMap(c -> insert(c, article, true))
.subscribe(created(rc));
}
private void deleteOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.flatMapCompletable(c -> delete(c, id))
.subscribe(noContent(rc), onError(rc));
}
private void getOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.flatMap(connection -> queryOne(connection, id))
.subscribe(ok(rc));
}
private void updateOne(RoutingContext rc) {
String id = rc.request().getParam("id");
Article article = rc.getBodyAsJson()
.mapTo(Article.class);
connect()
.flatMapCompletable(c -> update(c, id, article))
.subscribe(noContent(rc), onError(rc));
}
正如你所看到的,这些方法是使用我们之前看到的操作符来实现的。它们包含写入HTTP响应的调用。就这么简单...subscribe
我们完了!在这篇文章中,我们调整了我们的代码,使用反应式编程和RxJava 2. Vert.x和RxJava的组合将您的反应性带到了另一个层次。您可以非常轻松地编写和处理异步操作和流。
现在,不要忘记没有什么是免费的。RX可能很难理解。它可能看起来很奇怪。根据你的背景,你可能更喜欢和回调。Vert.x为您提供选择,并且您可以自由选择您喜欢的模型。Future
如果你想进一步,这里有一些资源:
本系列的下一篇文章将介绍在Kubernetes和OpenShift上部署我们的应用程序。
请继续关注,快乐的编码!