前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

作者头像
MongoDB中文社区
发布于 2019-03-21 08:55:44
发布于 2019-03-21 08:55:44
1.1K00
代码可运行
举报
文章被收录于专栏:MongoDB中文社区MongoDB中文社区
运行总次数:0
代码可运行

监控数据库发生的变化是MongoDB同步数据服务的关键。我们不需要去定期轮训查询集合中的更改文档,我们就可以可以更轻松地过滤Change Streams 变化流,并立即采取处理错误。这是一种Reactive反应式编程风格,可以非常强大。如今,获取这些变更信息流非常简单。

先介绍点历史知识。在MongoDB 3.6之前,如果我们要监听MongoDB中正在发生的变化,必须“tail the oplog”,跟踪操作日志,这是一个用于复制记录变更的集合。 “tail the oplog”的过程往往最终会出现复杂的问题,不受支持的,脆弱的代码,而这些代码在生产中存在风险,难以控制,并不是我们想要的。这意味着人们会避免使用Reactive反应式编程风格。

变更流和集合

Change Streams

and Collections

这种问题情况在MongoDB 3.6 Change Streams新功能出现后开始发生变化。变更流使其变得简单并且支持监听集合中的数据变化,而不在需要跟踪Oplog。是不是非常简单方便?让我们看一下Java和Node.js示例中movieDetails集合中发生的一些变化。

Java实现 Change Streams的代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

Node实现 Change Streams的代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
const MongoClient = require("mongodb").MongoClient;

const uri = "MONGODBURL";

const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.db("video").collection("movieDetails").watch();
 changeStream.on("change", next => {
   console.log(next);
 });
});

此Node代码连接到Mongodb数据库。然后,它选择数据库video和movieDetails集合,并使用watch()函数创建变化流。我们使用.on添加一个事件触发器(“change”,...然后代码将在变化流changeStream中获取changeStream事件,随后它将调用一个函数,执行处理代码。在这种情况下,它只是在文档更改时打印出Change Streams 变化流事件如果我运行此代码,然后使用MongoDB Compass查看movieDetail对象细节,下面是详细例子信息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{ _id:
   { _data:
      '825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004' },
  operationType: 'replace',
  clusterTime:
   Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1548865599 },
  fullDocument:
   { _id: 5c0ec4b74b052f9e2ef0c381,
     title: 'PS I Love You',
     year: 2007,
     ...
     awards: { wins: 2, nominations: 4, text: '2 wins & 4 nominations.' },
     type: 'movie' },
  ns: { db: 'video', coll: 'movieDetails' },
  documentKey: { _id: 5c0ec4b74b052f9e2ef0c381 } 
}

我们可以在Change Events变化事件文档中阅读更多Change Events内容https://docs.mongodb.com/manual/reference/change-events/,但快速方法是可以在operationType字段中找到Change Events重要信息,即更改类型。当我们观察集合时,它可以具有插入,更新,替换,删除或无效(insert, update, replace, delete or invalidate)的值。前四种类型代表了他们的名字。我们在上面的文档中看到的是Compass通过支付替换集合中的文档进行编辑的结果。

无效的operationType在变化流中出现,其中正在监控的集合被删除或重命名,或者集合所在的数据库被删除。这是关闭change Stream变更流的信号。本文档的其余部分是有关变更内容的信息;哪个命名空间、文档结构、以及变化发生的时间。

顺便说一句,上面的示例中更改文档是在MongoDB 4.x数据库上测试的,在以前的版本_data上添加了一个字段。这是一个恢复标志字段,允许对其进行记录的应用程序使用它们在流中的该点重新开始执行未完成的任务。

深入集合Collection

MongoDB 3.6版本Change Streams变化流已经做的很好,可以跟踪集合中的数据变化。但是之前很多人被迫使用oplog来跟踪全局变化,想要对整个数据库中所有变化跟踪并处理,这种情况就比较痛苦。监控整库变化这个功能在MongoDB 4.0添加进来了。它可以在数据库或整个部署上创建Change Streams变化流的功能 - 高可用副本集或分片集群。 4.0不仅允许对集合执行watch()监控,还可以允许对数据库或整个部署集群执行watch()。例子代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
const MongoClient = require("mongodb").MongoClient;

const uri ="MONGODBURL";

const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.watch();
 changeStream.on("change", next => {
   console.log(next);
});

现在,只要任何数据库中的任何集合任何数据被修改,就会打印日志到控制台。 这些不是我们要获得的唯一变化事件,还可以监控更多事件。 由于Change Streams监控已经可以监控到最广泛的范围,现在我们将看到删除集合时的drop事件,删除数据库时的dropDatabase事件以及重命名集合时rename重命名事件,都会被监控到。

如果我们只对特定数据库中发生的事件感兴趣,可以打开数据库并对其执行watch()。 我们可以获得该数据库中collection集合的所有更新,以及删除和重命名事件。 但是不会得到dropDatabase事件; 如果我们的数据库被删除,那么当数据库已经删除时,返回的结果是invalidate ,表示无效操作。

扩展学习

有了MongoDB 4.0 Change Streams增强新特性,我们可以跟踪单个集合Colletion、数据库或部署集群的数据库和集合中的所有变化。有些变化我们不会明确看到信息;必须通过在集合中创建文档来推断新集合和数据库的创建过程。

当复制到另一个MongoDB时,这些都不是大问题,因为数据库和集合创建是在新文档生成时创建的,可以推测出来。复制集合的困难点在于,检查新集合是否影响以前的集合,还有就是我们监控不到创建索引和其他操作,这些操作不会反映在为更改文档的日志中,不能通过变更流监控。

MongoDB4.0 Change Streams增强新特性 意味着我们现在更容易监控MongoDB数据库和集群活动,该功能提供了一种全新的方式将MongoDB呈现给另一个系统 - 实时监控MongoDB数据的变化。建议大家自己动手实战一下MongoDB 4.0 Change Streams。可以参考官方文档:https://docs.mongodb.com/manual/changeStreams

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-02-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Mongoing中文社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【五分钟了解MongoDB】Change Stream 和MongoDB 4.x
充分获知数据库的数据变动是从MongoDB向其他数据服务进行数据同步的关键点。与直接查询collection来获取数据变动相比,通过流式的方式进行监听会有效并及时的多。这是一种非常强大的“响应式编程”模式。随着MongoDB的版本更新,流式的获取方式将变得原来越易用。
MongoDB中文社区
2019/07/08
1.2K0
【五分钟了解MongoDB】Change Stream 和MongoDB 4.x
MongoDB Change Stream初体验
Change Stream是MongoDB从3.6开始支持的新特性。这个新特性有哪些奇妙之处,会给我们带来什么便利?本次的文章将就这个主题进行初步讨论。
MongoDB中文社区
2019/07/08
1K0
MongoDB Change Stream之三——应用场景及实践
change streams从本质上来说是提供了一种基于mongoDB的CDC(Change Data Capture)的解决方案。所谓的CDC就是变化数据捕获,简单理解为监听数据库系统的变更就好。下面的图中描述了CDC的典型场景,左边的是主数据库,不同的客户端可以向其中插入数据(有前后关系);中间是一个队列,这些数据变化都会被放到里面;右边是派生数据系统,消费队列里的变化,然后用作搜索和数据仓库等应用。市场上也不乏这种专门做CDC的产品,比如:HEVO,其宣称的优势包括:1)简单易上手,无需代码;2)良好的交互式用户界面;3)支持多种数据源;4)可容错的安全架构等。
phoenix、
2021/02/24
3.2K1
MongoDB Change Stream之三——应用场景及实践
MongoDB Change Stream之一——上手及初体验
Change Stream可以直译为"变更流",也就是说会将数据库中的所有变更以流式的方式呈现出来。用户可以很方便地对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。使用场景可以包括但不限于以下几种:
phoenix、
2020/10/10
10.5K0
MongoDB Change Stream之一——上手及初体验
技术干货 | 如何利用 MongoDB Change Streams 实现数据实时同步?
当前实时数据同步的应用场景较多,实现方式主要有两种,一是数据库厂家本身提供了实时数据捕获工具,如 Oracle 的 OGG 等;另外一种是实时解析数据库的事务日志,获取到实时变化的数据后进行同步,如 Flink CDC 等。
MongoDB中文社区
2023/01/04
3.9K0
技术干货 | 如何利用 MongoDB Change Streams 实现数据实时同步?
Mongo之ChangeStream详解
Change Stream可以直译为"变更流",也就是说会将数据库中的所有变更以流式的方式呈现出来。用户可以很方便地对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。使用场景可以包括但不限于以下几种:
tunsuy
2023/08/19
5350
Mongo之ChangeStream详解
Change Stream源码解读
MongoDB从3.6开始推出了Change Stream功能,提供实时的增量数据流功能,为同步、分析、监控、推送等多种场景使用带来福音。4.0中引入的混合逻辑时钟,可以支持分片集群在不关闭balancer的情况下,吐出的增量数据在即使发生move chunk发生的情况下,还能够保证数据的因果一致性。不但如此,随着4.0.7开始推出的High Water Mark功能,使得返回的change stream cursor包括Post Batch Resume Token,更好的解决Change Stream中ResumeToken推进的问题。关于Change Stream的功能解读,网上可以找到比较多的资料,比如张友东的这篇解读介绍了Change Stream与oplog拉取的对比以及基本的使用。本文将主要侧重从内核源码层面进行解读,主要介绍分片集群版下Change Stream在mongos和mongod上都执行了哪些操作。此外,由于4.0开始MongoDB使用了混合逻辑时钟,从而保证了move chunk的因果一致性,所以本文还会先简单介绍一下MongoDB中混合逻辑时钟的原理。
MongoDB中文社区
2020/11/02
2.5K0
Change Stream源码解读
MongoDB 新功能介绍-Change Streams
MongoDB 3.6已经GA有一段时间,网络上对于该版本新特性的详细介绍文章比较少为此借机会对部分新特性做一个相对详细的介绍。基于早期MongoDB版本实现如跨平台数据同步、消息通知、ETL及oplog备份等服务时大多依赖于 Tailable Cursors 的方式。当然这样的实现一来相对复杂同时也存在着一些风险(如不同版本oplog兼容性及过滤特定操作类型等)。
MongoDB中文社区
2018/08/14
2.2K0
MongoDB 新功能介绍-Change Streams
MongoDB Change Stream简介
MongoDB的Change Stream有点类似关系型数据库中的触发器,但是原理不完全相同。
AsiaYe
2021/10/14
1.1K0
MongoDB Change Stream之二——自顶向下流程剖析
事实上,所有的query基本也是这样一个流程,只是不同的命令会获得不同类型的cursor罢了。这里如果暂时不好理解的话,不妨把第一章内容浏览完再回过头来看看。
phoenix、
2021/02/24
3.4K2
MongoDB Change Stream之二——自顶向下流程剖析
快速阅读:MongoDB 3.6 新特性
Jesse是MongoDB的一名开发工程师,他在博文中详细讲述了MongoDB 3.6的一些新特性,博文内容小结如下,详细请参考原文。 1 协议压缩(Wire Protocol Compression)     在3.6中Client和Server之间的通信协议增加了压缩功能,使得MongoDB可以在带宽受限的主机上工作的很好,例如远程的云服务器主机。 2 OP_MSG消息格式 在3.6之前,MongoDB的通信协议扩展性很差,并且存在性能问题。在3.6中,Mathias Stearn 重新设计了一套通
joymufeng
2018/06/19
9930
完美数据迁移-MongoDB Stream的应用
尽管如此,目前还是有许多企业踏上了服务化改造的道路,这其中则免不了”旧改”的各种繁杂事。
MongoDB中文社区
2018/08/14
1.2K0
完美数据迁移-MongoDB Stream的应用
Flink CDC MongoDB Connector 的实现原理和使用实践
摘要:本文整理自 XTransfer 资深 Java 开发工程师、Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲。主要内容包括:
从大数据到人工智能
2022/09/09
2.8K0
Flink CDC MongoDB Connector 的实现原理和使用实践
数据实时同步之MongoDB
随着传统企业的发展,企业数据呈现多样化,海量化,难以实现数据快速分析。MongoDB是当前很多企业使用的,当日积月累数据很大时,就可能会忽略历史数据的价值,可以把数据实时同步到其他储存:HBASE、HIVE、HDFS文件等等。在当前大数据、云计算的时代潮流下,实现数据价值,对企业决策力、洞察发现力极其有益。
yuanyi928
2020/07/14
2.9K0
MongoDB从0开始到实践,整的很明白!
MongoDB是一个以JSON为数据模型的文档数据库,所谓“文档”,就是“JSON Document”,并不是我们一般理解的pdf,word,excel文档。
行百里er
2021/05/11
1.6K0
MongoDB从0开始到实践,整的很明白!
Node.js封装对mongodb操作的模块
张培跃 ID:laozhangsishu 不止于前 关注 增删改查: var mongodb=require("mongodb"); var MongoClient=mongodb.MongoClient; var connStr="mongodb://127.0.0.1:27017/"; //连接数据库 function _connect(cb){ MongoClient.connect(connStr,function(err,client){ if(err){
用户1272076
2019/03/26
2K0
MongoDB系列10:Change Streams构建实时同步数据流
本文是第10篇,主要讲述Change Streams构建实时同步数据流的实战经验,非常值得一看。
大数据和云计算技术
2018/07/26
2.4K0
MongoDB系列10:Change Streams构建实时同步数据流
Java与MongoDB 4.0多文档事务新特性体验
MongoDB 4.0增加了对多文档ACID事务的支持。但等等......这是否意味着MongoDB直到现在才支持事务?不,实际上MongoDB已经提供了对单个文档事务的支持。 MongoDB 4.0跨多文档、多语句、多集合和多数据库扩展了事务保证。 如果没有任何形式的事务数据完整性保证,数据库还有什么用呢?
MongoDB中文社区
2018/12/27
2.7K0
MongoDB 基础浅谈
作者:hazenweng,腾讯 QQ 音乐后台开发工程师 MongoDB 作为一款优秀的基于分布式文件存储的 NoSQL 数据库,在业界有着广泛的应用。下文对 MongoDB 的一些基础概念进行简单介绍。 1 MongoDB 特点 面向集合存储:MongoDB 是面向集合的,数据以 collection 分组存储。每个 collection 在数据库中都有唯一的名称。 模式自由:集合的概念类似 MySQL 里的表,但它不需要定义任何模式。 结构松散:对于存储在数据库中的文档,不需要设置相同的字段,并且
腾讯技术工程官方号
2021/09/18
1.5K0
完美数据迁移-MongoDB Stream的应用
最近微服务架构火的不行,但本质上也只是风口上的一个热点词汇。 作为笔者的经验来说,想要应用一个新的架构需要带来的变革成本是非常高的。
美码师
2018/08/27
1.5K0
完美数据迁移-MongoDB Stream的应用
推荐阅读
相关推荐
【五分钟了解MongoDB】Change Stream 和MongoDB 4.x
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验