NiFi Connection是在两个已连接的NiFi处理器组件之间临时保存FlowFiles的位置。每个包含排队的NiFi FlowFiles的Connection在JVM堆中都会占一些空间。本文将对Connection进行分析,探究NiFi如何管理在该Connection中排队的FlowFiles和Connection对堆和性能的影响。
NiFi是美国国家安全局开发并使用了8年的可视化数据集成产品,2014年NAS将其贡献给了Apache社区,2015年成为Apache顶级项目
NIFI可以处理各种各样的数据源和不同格式的数据。你可以从一个源中获取数据,对其进行转换,然后将其推送到另一个目标存储地。
在组件工具栏下的NiFi屏幕顶部附近有一个条形,称为状态栏。它包含一些关于NiFi当前健康状况的重要统计数据:活动线程的数量可以指示NiFi当前的工作状态,排队统计数据表示当前在整个流程中排队的FlowFile数量以及这些FlowFiles的总大小。
这是疯狂的水流。就像您的应用程序处理疯狂的数据流一样。如果您独自完成所有工作,那么很难将数据从一个存储路由到另一个存储,应用验证规则并解决数据治理,大数据生态系统中的可靠性问题。
DataFlow Manager(DFM)是NiFi用户,具有添加,删除和修改NiFi数据流组件的权限。
一些处理器支持配置运行持续时间(Run Duration)。此设置告诉处理器在单个任务中继续使用同一task尽可能多地来处理来自传入队列的的FlowFiles(或成批的流文件)。 对于处理单个任务本身非常快并且FlowFile数量也很大的处理器来说,这是一个理想的选择。
nifi.properties文件中有三个属性涉及 NiFi 内容存储库中内容的存档。
Fayson在前面的文章介绍了什么是NiFi,参考《0622-什么是Apache NiFi》。同时对如何在CDH中使用Parcel安装CFM做了介绍,参考《0623-6.2.0-如何在CDH中安装CFM》。本文会首先对NiFi的使用做一下简单的介绍,然后对处理器(Processor)进行详细介绍。
系统正在积极处理的FlowFiles保存在JVM内存中的Hash Map中。这使它们的处理效率非常高,但是由于多种原因,例如断电,内核崩溃,系统升级和维护周期,因此需要一种辅助机制来在整个进程重新启动中提供数据的持久性。FlowFile存储库是系统中当前存在的每个FlowFiles的元数据的Write-Ahead Log(或数据记录)。该FlowFile元数据包括与FlowFile相关联的所有attributes,指向FlowFile实际内容的指针(该内容存在于内容存储库中)以及FlowFile的状态,例如FlowFile所属的Connection/Queue。预写日志为NiFi提供了处理重启和意外系统故障所需的弹性。
本教程涵盖了Apache NiFi的核心概念及其在其中流量管理,易用性,安全性,可扩展架构和灵活扩展模型非常重要的环境中所扮演的角色。
单独启动“GenerateFlowFile”处理器后,可以观察到对应的Connection连接队列中有数据,在Connection连接上右键“List Queue”可以查看队列中的FlowFile信息:
NiFi使用预写日志来跟踪FlowFiles(即数据记录)在系统中流动时的变化。该预写日志跟踪FlowFiles本身的更改,例如FlowFile的属性(组成元数据的键/值对)及其状态,再比如FlowFile所属的Connection /Queue。
通常我们在NIFI里最常见的使用场景就是读写关系型数据库,一些组件比如GenerateTableFetch、ExecuteSQL、PutSQL、ExecuteSQLRecord、PutDatabaseRecord等等,都会有一个属性配置大概叫Database Connection Pooling Service的,对应的接口是DBCPService,其实现类有:HiveConnectionPool DBCPConnectionPool DBCPConnectionPoolLookup。我们用的最多的就是DBCPConnectionPool。具体怎么配置这里就不赘述了,看对应的Controller Service文档就可以了。
为了创建高效的数据流处理流程,需要了解可用的处理器(Processors )类型,NiFi提供了大约近300个现成的处理器。这些处理器提供了可从不同系统中提取数据,路由,转换,处理,拆分和聚合数据以及将数据分发到多个系统的功能。如果还不能满足需求,还可以自定义处理器。
本文通过Groovy,Jython,Javascript(Nashorn)和JRuby中的代码示例,介绍了有关如何使用Apache NiFi处理器ExecuteScript完成某些任务的各种方法。本文中的内容包括:
鼠标双击处理器或者选择以上“Configure”,打开配置处理器选项,配置分为四个部分:SETTINGS,SCHEDULING,PROPERTIES,COMMENTS。
需求:随机生成一些测试数据集,对生成的数据进行正则匹配,对匹配后的数据进行输出到外部文件中。以上需要用到的“GenerateFlowFile”、“ReplaceText”、“PutFile”处理器。
案例:使用NiFi将某个目录下产生的json类型的日志文件导入到Hive。这里首先将数据通过NiFi将Json数据解析属性,然后手动设置数据格式,将数据导入到HDFS中,Hive建立外表映射此路径实现外部数据导入到Hive中。
内容存储库就是本地存储所有FlowFiles内容的地方,通常是三个存储库中最大的。该存储库利用不变性和写时复制来最大提升读写速度和保证线程安全性。Content Repo的核心设计是将FlowFile的内容保存在磁盘上,并仅在需要时才将其读入JVM内存。这使NiFi可以处理大量小的对象,而无需生产者和消费者处理器将完整的对象保存在内存中。因此,在不损害内存的情况下,非常容易执行诸如拆分,聚合和转换非常大的对象之类的操作。
简介:本文主要讲解Apache NIFI的调度策略,对象主要是针对Processor组件。本文假定读者已经对Apache NIFI有了一定的了解和使用经验,同时作者也尽可能的去讲解的更透彻,使得本文尽可能让对NIFI接触不深的读者也能够看懂。
以上案例需要用到的处理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
Max Timer Driven Thread Count 和 Max Event Driven Thread Count
该处理器执行SQL语句,返回avro格式数据。处理器使用流式处理,因此支持任意大的结果集。处理器可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行,也可以由传入的流文件触发。SQL语句来源可以来自该处理器属性SQL select query,也可以来自上一个处理器的输出流(UTF-8格式)(GenerateTableFetch,ConvertJsonToSql等等生成的流内容中的SQL语句,类似于insert into。。。value (?。。。),这个?的值是存在于流属性中的:sql.args.N.value sql.args.N.type ,ExecuteSQL会自动装配并执行)
该处理器使用随机数据或自定义内容创建流文件。GenerateFlowFile用于负载测试、配置和仿真。
Cloudera Data Flow(CDF)作为Cloudera一个独立的产品单元,围绕着实时数据采集,实时数据处理和实时数据分析有多个不同的功能模块,如下图所示:
本文主要研究一下nifi的AbstractBinlogTableEventWriter
本文简单的讨论一下Apache NIFI项目结构的类资源隔离机制,适合接触过源码的同学阅读。
PutDatabaseRecord处理器使用指定的RecordReader从传入的流文件中读取(可能是多个,说数组也成)记录。这些记录将转换为SQL语句,并作为一个批次执行。如果发生任何错误,则将流文件路由到failure或retry,如果执行成功,则将传入的流文件路由到success。处理器执行的SQL语句类型通过Statement Type属性指定,该属性接受一些硬编码的值,例如INSERT,UPDATE和DELETE,使用“Use statement.type Attribute”可以使处理器获取流文件属性中的语句类型。
NiFi的基本设计理念是基于数据流的编程Flow-Based Programming(FBP),应用是由处理器、连接器组成的网络。数据进入一个节点,由该节点对数据进行处理,根据不同的处理结果将数据路由到后续的其他节点进行处理。这是NiFi的流程比较容易可视化的一个原因。以下是NiFi的一些概念:
Apache NiFi是什么?NiFi官网给出如下解释:“一个易用、强大、可靠的数据处理与分发系统”。通俗的来说,即Apache NiFi 是一个易于使用、功能强大而且可靠的数据处理和分发系统,其为数据流设计,它支持高度可配置的指示图的数据路由、转换和系统中介逻辑。 为了对NiFi能够表述的更为清楚,下面通过NiFi的架构来做简要介绍,如下图所示。
前言:本文重点在于通过模拟事故来探索Apache NIFI集群的高可用,情景假定有一个3节点的NIFI集群,其中某个节点因为未知原因与集群失联,研究集群(两个在联节点集群)和失联的节点会发生什么,各个节点上的数据会怎样。(注意:节点因为未知原因与集群失联区别于系统管理员手动卸载节点)。除此之外,其他不做重点。
在过去的几周中,我进行了四个现场的NiFi演示会议,在不同地理区域有1000名与会者,向他们展示了如何使用NiFi连接器和处理器连接到各种系统。我要感谢大家参与和出席这些活动!如今,当在家中远程工作成为一种规范时,我们都需要交互式的演示会议和实时问答。如果您还没有看过我的现场演示会议,可以在这里观看,视频还没有过期。
NIFI中文文档地址:https://nifichina.gitee.io/ 更新日志 2020-05-21 新增TailFile 新增ExecuteScript 新增探索 Apache NIFI 集群的高可用 2020-05-18 The 4 V’s of Big Data 2020-05-18 新增AttributeRollingWindow 新增CompareFuzzyHash 新增Apache NIFI入门(读完即入门) 新增了解NiFi最大线程池和处理器并发任务设置 新增深入理解NIFI Conn
2006年NiFi由美国国家安全局(NSA)的Joe Witt创建。2015年7月20日,Apache 基金会宣布Apache NiFi顺利孵化成为Apache的顶级项目之一。NiFi初始的项目名称是Niagarafiles,当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支持。Hortonworks公司收购了Onyara并将其开发者整合到自己的团队中,形成HDF(Hortonworks Data Flow)平台。2018年Cloudera与Hortonworks合并后,新的CDH整合HDF,改名为Cloudera Data Flow(CDF),并且在最新的CDH6.2中直接打包,参考《0603-Cloudera Flow Management和Cloudera Edge Management正式发布》,而Apache NiFi就是CFM的核心组件。
提到Cloudera我们第一个想到的就是Hadoop,在Hadoop生态系统中,规模最大、知名度最高的公司就是Cloudera。
在本实验中,您将运行一个简单的 Python 脚本来模拟来自一些假设的机器的 IoT 传感器数据,并将数据发送到 MQTT 代理 ( mosquitto )。MQTT 代理扮演网关的角色,通过“mqtt”协议连接到许多不同类型的传感器。您的集群附带模拟脚本发布到的嵌入式 MQTT 代理。为方便起见,我们将使用 NiFi 来运行脚本而不是 Shell 命令。
Apache NiFi 最新版本中内置的 Python 处理器可以简化数据处理任务,增强灵活性并加快开发速度。
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION
NiFi DataFlow Manager(DFM)用户可能会发现在单个服务器上使用一个NiFi实例不足以处理他们拥有的数据量。因此,一种解决方案是在多个NiFi服务器上运行相同的数据流。但是,这会产生管理问题,因为每次DFM想要更改或更新数据流时,他们必须在每个服务器上进行这些更改,然后单独监视每个服务器。通过集群NiFi服务器,可以增加处理能力以及单个接口,通过该接口可以更改数据流并监控数据流。集群允许DFM仅进行一次更改,然后将更改复制到集群的所有节点。通过单一接口,DFM还可以监视所有节点的健康状况和状态。
NiFi在大数据生态中的定位是成为一个统一的,与数据源无关的大数据集成平台。Cloudera将NiFi作为其新产品Cloudera Flow Management和Cloudera Edge Management的核心组件推出,可以方便地使用Cloudera Manager进行Parcel安装和集成。于此同时,Flume被移出了Cloudera Runtime,可见NiFi替换Flume的意图已经十分明显。
Apache NiFi 是一个易于使用、功能强大而且可靠的数据处理和分发系统,在大数据生态中的定位是成为一个统一的,与数据源无关的大数据集成平台。Apache NiFi 是为数据流设计,它支持高度可配置的指示图,来指示数据路由、转换和系统中流转关系,支持从多种数据源动态拉取数据。简单地说,NiFi是为自动化系统之间的数据流而生。 这里的数据流表示系统之间的自动化和受管理的信息流。 基于WEB图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集、处理等功能。未来NiFi有可能替换Flume、Sqoop等大数据导数据的工具。
Apache NiFi 1.14.0 版是一个增加了重要的功能、改进和bug修复的版本,发布日期2021年7月14日。
前面写了flink的文章,其实流处理不止有flink、storm、spark streaming,说实话这些其实都是比较传统的流处理框架。今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi NiFi的来源 Apache NiFi项目,它是一种实时数据流处理 系统,在去年由美国安全局(NSA)开源并进入Apache社区,NiFi初始的项目名称是Niagarafiles。当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支
在RunNiFi.java源码解读中有提到,最终RunNiFi进程在主程序中启动了新的进程NiFi,并循环监听NIFI进程的状态,直到NIFI进程不在运行,RunNiFi主程序才结束。
现在我们要自定义一个Processor,假设它叫MyProcessor.java,那么这个Java文件写在哪里呢?
整个脚本分为三部分,第一部分是确定NIFI各个路径 目录的确定,设置环境变量,第二部分是方法区。第三部分是脚本逻辑代码的入口,粗略的根据不同的参数去执行不同的方法。以下脚本有详细注释:
简单地说,NiFi就是为了实现系统间数据流的自动化而构建的。虽然术语“数据流”用于各种上下文,但我们在此处使用它来表示系统之间的自动和管理信息流。这个问题空间一直存在,因为企业有多个系统,其中一些系统创建数据,一些系统消耗数据。已经讨论并广泛阐述了出现的问题和解决方案模式。企业集成模式[eip]中提供了一个全面且易于使用的表单。
在本系列的前一篇博客《将流转化为数据产品》中,我们谈到了减少数据生成/摄取之间的延迟以及从这些数据中产生分析结果和洞察力的日益增长的需求。我们讨论了如何使用带有 Apache Kafka 和 Apache Flink 的Cloudera 流处理(CSA) 来实时和大规模地处理这些数据。在这篇博客中,我们将展示一个真实的例子来说明如何做到这一点,看看我们如何使用 CSP 来执行实时欺诈检测。
领取专属 10元无门槛券
手把手带您无忧上云