首页
学习
活动
专区
圈层
工具
发布

大数据NiFi(六):NiFi Processors(处理器)

NiFi Processors(处理器)为了创建高效的数据流处理流程,需要了解可用的处理器(Processors )类型,NiFi提供了大约近300个现成的处理器。...此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。GetHDFS:监视HDFS中用户指定的目录。每当新文件进入HDFS时,它将被复制到NiFi并从HDFS中删除。...PutSQL:将FlowFile的内容作为SQL语句(INSERT,UPDATE或DELETE)执行,该处理器将执行sql语句,同时支持参数化的SQL语句。...ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后可以将其传递给PutSQL Processor。...ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容对其进行评估,然后将结果值提取到用户自己命名的Attribute中。

3.2K122

Apache NiFi安装及简单使用

3.数据库访问 ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后将其传递给PutSQL处理器 ExecuteSQL:执行用户定义的SQL SELECT命令...,将结果写入Avro格式的FlowFile PutSQL:通过执行FlowFile内容定义的SQL DDM语句来更新数据库 SelectHiveQL:针对Apache Hive数据库执行用户定义的HiveQL...EvaluateXQuery:用户提供XQuery查询,然后根据XML内容评估此查询,以替换FlowFile内容或将该值提取到用户命名的属性中。...每当一个新的文件进入HDFS,它被复制到NiFi中。该处理器仅在主节点上运行,如果在群集中运行。为了从HDFS中复制数据并保持原样,或者从集群中的多个节点流出数据,请参阅ListHDFS处理器。...PutSQL:作为SQL DDL语句(INSERT,UPDATE或DELETE)执行 FlowFile的内容。FlowFile的内容必须是有效的SQL语句。

11.2K21
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    sql中select into的用法_sql语句insert into用法

    大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说sql中select into的用法_sql语句insert into用法,希望能够帮助大家进步!!!...1.select into from语句: 注意内容:要求目标表A不存在,因为在插入时会自动创建表A,并将B中指定字段数据复制到A中。...示例如下: select * into A from B 2.insert into select 语句: 注意: (1)要求目标表B必须存在,并且字段field,field1...也必须存在 (2)注意...B的主键约束,如果B有主键而且不为空,则 field1, field2...中必须包括主键 (3)注意语法,不要加values,和插入一条数据的sql混了,不要写成:insert into B (field...,... from A 或 insert into B select * from A 今天文章到此就结束了,感谢您的阅读,Java架构师必看祝您升职加薪,年年好运。

    3.5K30

    MySQL中插入语句(Insert)的几种使用方式

    注意:insert这种简写的方式虽然非常简单,但是Values后面的值必须和表中的类顺序对应,且类型要保持一直,即使表中某一个列不需要值也必须赋值为null,比如我们的主键id设置的是递增实际上是不用设置值的...3.REPLACE INSERT语句 此语句的作用是当我们在插入一条数据时,如果此条已经存在,那么先删除原来存在的数据再添加插入的数据,如果不存在那么直接插入新的数据。...4.INSERT IGNORE INTO 语句 此语句的作用是如果插入的数据已经存在那么就忽略插入的数据(也就是不改变原来的数据),如果不存在则插入新的数据。...6.INSERT SELECT语句 1.此语句的作用是将SELECT语句的结果插入表中,可实现数据迁移。...查看被插入的表的所有数据 ? 执行INSERT SELECT语句并查看结果 ?

    3.2K30

    MySQL实战中,Insert语句的使用心得总结

    1-3.插入或替换 1-4.插入或忽略 二、大量数据插入 2-1、三种处理方式 2-1-1、单条循环插入 2-1-2、修改SQL语句批量插入 2-1-3、分批量多次循环插入 2-2、插入速度慢的其他几种优化途径...三、REPLACE INTO语法的“坑” 一、Insert的几种语法 1-1.普通插入语句 INSERT INTO table (`a`, `b`, `c`, ……) VALUES ('a', 'b'...,INSERT语句将插入新记录,否则,当前username='chenhaha'的记录将被更新,更新的字段由UPDATE指定。...但没办法,就像用WPS写的xlsx用Office无法打开一样。 1-3.插入或替换 如果我们想插入一条新记录(INSERT),但如果记录已经存在,就先删除原记录,再插入新记录。...Mapper中的sql写法: insert id="insertListUser" parameterType="java.util.List"> INSERT INTO `db`.

    2.3K20

    如何在 Python 中搜索和替换文件中的文本?

    在本文中,我将给大家演示如何在 python 中使用四种方法替换文件中的文本。 方法一:不使用任何外部模块搜索和替换文本 让我们看看如何在文本文件中搜索和替换文本。...然后我们将 t=read 并使用 read() 和 replace() 函数替换文本文件中的内容。...语法:路径(文件) 参数: file:要打开的文件的位置 在下面的代码中,我们将文本文件中的“获取更多学习资料”替换为“找群主领取一本实体书”。使用 pathlib2 模块。...= "Java" # 调用replacetext函数并打印返回的语句 print(replacetext(search_text, replace_text)) 输出: 文本已替换 方法 3:使用正则表达式模块搜索和替换文本...= "replaced" # 调用replacetext函数并打印返回的语句 print(replacetext(search_text, replace_text)) 输出: 文本已替换

    22.3K42

    NIFI里的数据库连接池

    通常我们在NIFI里最常见的使用场景就是读写关系型数据库,一些组件比如GenerateTableFetch、ExecuteSQL、PutSQL、ExecuteSQLRecord、PutDatabaseRecord...然后PutSQL PutDatabaseRecord之类的Rollback On Failure,设置为true的时候,执行SQL报错抛出的SQLExeception也会NIFI回滚事务。...最好是建流程的时候,衡量处理器和线程的数量与此连接池的最大连接数,在数据库连接的时候,让处理器处理数据的时候总是可以获取到一个连接,毕竟阻塞在那里,还是耗服务器的资源的。...,key是我们自己命名的,value是我们选择的当前流程可用的DBCPConnectionPool,然后在流程运行过程中,DBCPConnectionPoolLookup根据FlowFile中一个叫database.name...使用DBCPConnectionPoolLookup的最大优点是什么?灵活啊!组件不绑定于一个数据库,根据流文件中的属性动态去查找对应的数据库。 ? 文章有帮助的话,小手一抖点击在看,并转发吧。

    3.3K10

    NIFI里你用过PutDatabaseRecord嘛?

    描述 PutDatabaseRecord处理器使用指定的RecordReader从传入的流文件中读取(可能是多个,说数组也成)记录。这些记录将转换为SQL语句,并作为一个批次执行。...处理器执行的SQL语句类型通过Statement Type属性指定,该属性接受一些硬编码的值,例如INSERT,UPDATE和DELETE,使用“Use statement.type Attribute...”可以使处理器获取流文件属性中的语句类型。...可以从record中的某个字段读取值,此值应该是一个可以执行的SQL语句,该处理器就执行这个SQL就可以了。...然后得说一下这个Translate Field Names,这个功能点其实非常好,其实就是将列名转大写替换下划线(Record中的列和指定表的列都做此转换,指定表的列信息会做成一个Map映射,转换的列名

    4.3K20

    mysql查询字段中带空格的值的sql语句,并替换

    set col=rtrim(col); (1)mysql replace 函数 语法:replace(object,search,replace) 意思:把object中出现search的全部替换为...replace 代码如下 复制代码 update `news` set `content`=replace(`content`,’ ‘,”);//清除news表中content字段中的空格 这样就可以直接用...,如果数据库中的这个字段的值含有空格(字符串内部,非首尾),或者我们查询的字符串中间有空格,而字段中没有空格。...这样就可以正确的进行匹配了,如果不希望给mysql太多压力,条件部分的对空格的处理我们可以在程序中实现。...语句、mysql修改字段sql语句、mysql删除字段sql语句、mysql加字段sql语句、mysql添加字段语句,以便于您获取更多的相关知识。

    13.4K20

    大数据NiFi(二十):实时同步MySQL数据到Hive

    ”转换后FlowFile中的属性来替换原有数据组成一个“insert into ... values (... ...)”语句,方便后续将数据插入到Hive中。...文件发送到各个NiFi节点对应的路径/root/test下替换原有的core-site.xml文件。...之后重启NiFi集群,各个NiFi节点上执行命令: service nifi restart 七、配置“PutHiveQL”处理器 “PutHiveQL”主要执行HiveQL的DDL/DML命令,传入给该处理器的...Statement Delimiter (语句分隔符) ; 语句分隔符,用于分隔多个语句脚本中的SQL语句。...处理器的状态,单独启动“CaptureChangeMySQL”处理器,清空重新消费的数据(以上主要就是避免此版本NiFi bug问题),启动当前案例中其他NiFi处理器。

    4.7K121

    Apache NiFi 组件使用介绍 -- Funnel

    概述 官方介绍 Apache NiFi User Guide Funnel: A funnel is a NiFi component that is used to combine the data...漏斗是 NiFi 组件,用于将来自多个连接的数据合并到单个连接中 使用场景 用来组织复杂流程内的众多处理器. 1 减少处理器多对一之间的复杂连接 如下如.想象一下有 20 个这样的生成 UpdateAttribute...处理器,希望后续处理器分隔文本。...现在,您需要将 SplitText 处理器替换为其他处理器。这样做将是一项困难的工作,因为它直接连接到 SplitText 处理器。...但是,如果它们之间有一个漏斗,则只需替换漏斗的目标,而不是更换所有处理器 [funnel-1.png] 2 对多个连接内的流文件进行统一的背压,优先级设置 [funnel-2.png]

    2.5K00

    MySQL中insert语句没有响应的问题分析(r11笔记第21天)

    今天开发的一个同学问我一个MySQL的问题,说在测试数据库中执行一条Insert语句之后很久没有响应。我一看语句是一个很常规的insert into xxx values形式的语句。...可以看到大量的线程是Waiting for table level lock ,开发同学提交的SQL语句也被锁住了,也是同样的锁。...ptp_jgg(sub_type) values(9999)这类表级锁好像在MyISAM中还是看到过,结果查看表的存储引擎,发现都是InnoDB, 对于这类问题的一种解决方法,就是使用kill的方式杀掉线程...查看MySQL的error log也没有发现什么明显的错误,使用ps -ef|grep mysql查看进程的信息,突然发现系统中是设置了一个定时任务去备份数据,不过开始没有引起我的注意,但是这些线索都逐一排除之后...打开备份脚本,我就明白问题的原委了。 备份的核心语句是通过变量的方式调用mysqldump的。

    1.5K120

    大数据NiFi(十九):实时Json日志数据导入到Hive

    一、配置“TailFile”处理器 “TailFile”处理器作用是"Tails"一个文件或文件列表,在文件写入文件时从文件中摄取数据。监控的文件为文本格式,当写入新行时会接收数据。...处理器会替换正则表达式匹配到的FlowFile中的内容,生成新的FlowFile内容。...这里我们使用“ReplaceText”处理器将上个处理器“EvaluateJsonPath”处理后的每个FlowFile内容替换成自定义的内容,这里自定义内容都是从FlowFile的属性中获取的值,按照...当数据流向下游“ReplaceText”处理器时,由于设置每行替换成指定格式的行,这时会出现将本批次所有行数据都替换成了第一行的json格式数据。...json格式转换成自定义文本格式数据,再传递到“PutHDFS”处理器即可,所以解决以上问题,我们这里复用之前的“TailFile”和“PutHDFS”处理器即可,下面只需要配置“ConvertRecord

    3.6K91

    有特点的流处理引擎NiFi

    今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi NiFi的来源 Apache NiFi项目,它是一种实时数据流处理 系统,在去年由美国安全局(NSA)开源并进入Apache社区,NiFi...当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支持。...Hortonworks公司收购了Onyara并将其开发者整合到自己的团队中,形成HDF(Hortonworks Data Flow)平台。...HDF中的数据流动可以是多个方向,甚至是点对点的,用户可以同收集到的数据流进行交互,这种交互甚至可以延伸到数据源,比如一些传感器或是设备。...按照Hortonworks公司的说法,HDF产品是对HDP产品的补充,前者主要处理移动中的数据,而后者基于Hadoop技术,主要负责从静止的数据中获取洞察。

    2.3K80

    大数据流处理平台的技术选型参考

    在做技术选型时,需要选择适合需求、适合项目类型、适合团队的技术。这是实用主义的判断,而非理想主义的追捧。若是在实用的技术选型中,再能点燃一些些技术上的情怀,那就perfect了!...我针对Flume、Flink、Storm、Apex以及NiFi的数据流模型作了一个简单的总结。 Flume Flume的数据流模型是在Agent中由Source、Channel与Sink组成。 ?...Apex Apex将数据流模型称之为Operators,并将其分离出来,放到单独的Apex Malhar中。...NiFi NiFi对流模型的主要抽象为Processor,并且提供了非常丰富的数据源与数据目标的支持。 ?...PutSQL PutKafka PutMongo Nifi也支持用户自定义Processor,例如通过继承NiFi定义的AbstractProcessor类。

    1.7K50

    大数据NiFi(十五):NiFi入门案例二

    ​NiFi入门案例二需求:随机生成一些测试数据集,对生成的数据进行正则匹配,对匹配后的数据进行输出到外部文件中。...二、配置“ReplaceText”处理器“ReplaceText”处理器会替换正则表达式匹配到的FlowFile中的内容,生成新的FlowFile内容。...,替换其中的“world”为“nifi”。...三、配置“PutFile”处理器关于“PutFile”处理器的创建及配置参数参照案例一,这里直接给出“PutFile”处理器的配置,将替换后的FlowFile写入外部路径中“/root/test/matchFile...”中数据如下: 启动“ReplaceText”处理器,查看处理的数据:启动“PutFile”处理器,NiFi集群对应的每个节点上都生成对应的数据:查看数据结果:

    2.2K121

    Edge2AI之NiFi 和流处理

    实验 4 - 使用 NiFi 调用 CDSW 模型端点并保存到 Kudu 在本实验中,您将使用 NiFi 消费包含我们在上一个实验中摄取的 IoT 数据的 Kafka 消息,调用 CDSW 模型 API....nip.io/model Record Reader: JsonTreeReader Record Path: /response 笔记 以上必须替换为您的集群的公共...创建 Kudu 表 在下一部分中,您将在 NiFi 中配置PutKudu处理器以将数据写入 Kudu 表。在配置该处理器之前,让我们创建 Kudu 表。...登录到 Hue,然后在Impala 查询编辑器中,运行以下语句: CREATE TABLE sensors ( sensor_id INT, sensor_ts BIGINT, sensor_0 DOUBLE...请按照以下步骤操作: 启动流程中的所有处理器。 刷新您的 NiFi 页面,您应该会看到消息通过您的流程。失败队列应该没有排队的记录。

    3.5K30
    领券