Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github. 2013年,Storm进入Apache社区进行孵化. 2014年9月,晋级成为了Apache顶级项目. 国内外各大网站使用,例如雅虎、阿里、度 官网 http://storm.apache.org/
特点
注意:
组件说明
拓扑图
架构
详细说明见第四章第一节
数据结构
有向无环图(DAG,directed acyclic graph): 起始点一定是spout, 终点一定是 bolt, 拓扑有方向, 如下图
1.Topology(译为拓扑结构) – DAG有向无环图的实现
2.Tuple – 元组
3.Stream – 数据流
4.Spout – 数据源
5.Bolt – 数据流处理组件
6.Stream Grouping – 数据流分组(即数据分发策略)
注意: 1,4,5,6 在Storm开发中经常用到
环境准备, 案例用到的jar在底部分享, 下载后在项目下创建一个lib目录, 然后右击bulild path全部即可
用于数据的推送 这里是将每个i 的值推送给 bolt 进行处理
package ah.szxy.storm.bolt;
import java.util.List;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
* 手动继承 BaseRichSpout, 实现它的未实现方法
* @author chy
*/
public class WsSpout extends BaseRichSpout{
private Map map;
private TopologyContext context;
private SpoutOutputCollector collector;
int i=0;//nextTuple方法会被循环调用,因此i应该是成员变量
/**
* 1.配置初始化spout
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.map=map;
this.context=context;
this.collector=collector;
}
/**
* 2.采集并且向后推送数据
*/
@Override
public void nextTuple() {
/**
* 这里体现了面向接口的核心思想
* 如果声明直接使用Values, 接收数据的类型就会被限制死了
*/
List list = new Values(i++);
this.collector.emit(list);
System.err.println("num==========="+list);
Utils.sleep(1000);//和线程休眠效果一样,storm包提供
}
/**
* 3.向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}
}
用于对spout的数据进行逻辑处理 这里是对数据进行求和
package ah.szxy.storm.spout;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 继承BaseRichBolt, 实现相关方法
* @author chy
*
*/
public class WsBolt extends BaseRichBolt{
//成员变量
private Map stormConf;
private TopologyContext comtext;
private OutputCollector collector;
//求和
int sum=0;
/**
* 准备阶段(提供逻辑运算的环境)
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf=stormConf;
this.comtext=context;
this.collector=collector;
}
/**
* 获取数据 ( 有必要的话, 向后继续发送数据 )
*/
@Override
public void execute(Tuple input) {
// input.getInteger(0);
int num=input.getIntegerByField("num");//接收的是spout类中declareOutputFields方法声明的字段名称
sum+=num;
System.err.println("sum========================="+sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
构建拓扑结构模型 测试程序是否正常运行
package ah.szxy.storm.test;
import ah.szxy.storm.bolt.WsSpout;
import ah.szxy.storm.spout.WsBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class TestWs {
/**
* 建立拓扑结构, 加入集群中运行
* @param args 命令行参数
*/
public static void main(String[] args) {
//构建storm拓扑结构( Topology: 拓扑结构)
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("wsspout", new WsSpout());
tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout");
//创建本地storm集群
LocalCluster lc=new LocalCluster();
//将任务布置到集群中运行
lc.submitTopology("wordsum", new Config(), tb.createTopology());
}
}
注意:
需要注意的是这里采取了随机的方式推送数据 因此下面在结果打印时, 打印的数据可能相同
/**
* spout数据推送
* @author chy
*
*/
public class WcSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//定义需要被统计字符串数据
String[] text= {
"I am a walker",
"I like play computer and comic",
"I like study and sing",
"My nickname is TimePause",
"TimePause is not simple history"
};
//定义一个随机数变量r
Random r=new Random();
/**
* 初始化方法
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf=conf;
this.context=context;
this.collector=collector;
}
/**
* 采集并向后推送数据
*/
@Override
public void nextTuple() {
//从数组中随机取出一行,放到list集合中
List line=new Values(text[r.nextInt(text.length)]);
//推送数据
this.collector.emit(line);
System.err.println("spout emit line========"+line);
Utils.sleep(1000);
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
/**
* 第一个Bolt---进行分词
* @author chy
*
*/
public class WcSplitBolt extends BaseRichBolt{
Map stormConf;
TopologyContext context;
OutputCollector collector;
/**
* 准备阶段(提供逻辑运算的环境)
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf=stormConf;
this.context=context;
this.collector=collector;
}
/**
* 获取tuple元祖中每一行数据并切割
* @param input
*/
@Override
public void execute(Tuple input) {
//input.getString(0);//通过偏移量获取
String line=input.getStringByField("line");
//切割
String[] words = line.split(" ");
for (String word : words) {
List wordList=new Values(word);
this.collector.emit(wordList);//发送数据
}
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordList"));
}
}
/**
* 第二个Bolt---分词后的统计
* @author chy
*
*/
public class WcCountBolt extends BaseRichBolt{
//用来存放,单词,以及单词出现的个数
Map<String, Integer> map=new HashMap<String, Integer>();
/**
* 准备阶段(提供逻辑运算的环境)
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 获取tuple元祖中每一个单词, 并且按照单词统计出出现的次数
* @param input
*/
@Override
public void execute(Tuple input) {
String word=input.getStringByField("wordList");//到这里获取的方式时一个一个的获取
//存放单词数量,之所以不设置为全局是因为每次key的值都不一样
int count=1;
if (map.containsKey(word)) {//如果出现,则count+1
count=(int)map.get(word)+1;//map.get(key)获取map的值
}
map.put(word, count);
System.err.println("WcCountBolt emit===key:"+word+"==count:"+count);
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/**
* 测试类
* @author chy
*
*/
public class TestWc {
public static void main(String[] args) {
//创建拓扑结构
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("WcSpout", new WcSpout());
tb.setBolt("WcSplitBolt",new WcSplitBolt()).shuffleGrouping("WcSpout");
//fieldsGrouping:根据单词属性名称进行分组
tb.setBolt("WcCountBolt", new WcCountBolt(), 3).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", new Config(), tb.createTopology());
}
}
结果展示
因为spout采取随机推送, 因此数据重复的可能性非常大
由上面两个案例的test方法中我们可以看到Storm Grouping的作用,下面我们来具体学习一下它吧~~~
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
轮询,平均分配
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。
广播发送,对于每一个tuple,所有的bolts都会收到
全局分组,把tuple分配给task id最低的task 。
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
自定义,相当于mapreduce 自己去实现一个partition一样。
单一节点安装, 但是具备分布式所具备的所有组件
## 单机模式
## 上传解压,资料分享至末尾
$ tar xf apache-storm-0.10.0.tar.gz
$ cd apache-storm-0.10.0
$ storm安装目录下创建log: mkdir logs
$ ./bin/storm --help
下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer
##错误信息放到标准输入中,
$ ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 &
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ ./bin/storm logviewer >> ./logs/logviewer.out 2>&1 &
# 需要等一会儿
$ jps
6966 Jps
6684 logviewer
6680 dev_zookeeper
6681 nimbus
6682 core
6683 supervisor
# 访问图形化界面( 图1 )
http://nodex:8080
# 提交任务到Storm集群当中运行:
## 首先将WrodCount程序打包成 WrodCount.jar 放到/root/chy/software ,需要阅读下方的注意事项
## 在Strom根目录下运如下命令 ./bin/storm jar jar全路径 主类/启动类的全路径( 图2 )
./bin/storm jar /root/chy/software/WrodCount.jar ah.szxy.storm.tesTestWc wc
注意: 在将项目打包放到伪分布式环境中时, 修改了主类如下的代码, 使其能够依靠集群环境下运行
//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
Config config = new Config();
if (args.length>0) {
try {
StormSubmitter.submitTopology(args[0], config, tb.createTopology());
} catch (Exception e) {
}
}else {
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", config, tb.createTopology());
}
图1
图2
环境要求
java -version
JDK 1.6+
python -V (系统内置)
Python 2.6.6+
ZooKeeper3.4.5+
storm 0.9.4+
各节点分配情况 | Nimbus | Supervisor | Zookeeper |
---|---|---|---|
node2 | * | | * |
node3 | | * | * |
node4 | | * | * |
具体步骤
思路: 首先在node2配置storm, 配置完成后分发给node3,node4
node1作为nimbus,
# 1. 开始配置storm.yaml
$ vim conf/storm.yaml
--------------------------------------
storm.zookeeper.servers:
- "node2"
- "node3"
- "node4"
# 任务的存储目录
storm.local.dir: "/tmp/storm"
# 声明主节点在哪里
nimbus.host: "node2"
# 指定从节点的槽位,一个从节点对应四个槽位,一个槽位对应一个worker,一个worker对应一个端口
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
-------------------------------
# 2.在storm目录中创建logs目录
$ mkdir logs
# 3. (分发)集群其他服务器node3,node4
## 启动ZooKeeper集群(node2,3,4)
zkServer.sh start
# 4. node1上启动Nimbus
## 2>&1的意思就是将标准错误重定向到标准输出, & 为后台输出
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ tail -f logs/nimbus.log
$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ tail -f logs/ui.log
# 5. 节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ tail -f logs/supervisor.log (当然node1也可以启动supervisor)
# 6.访问图形化界面(图1),至此安装完成
http://node2:8080/
# 集群测试
## 上传jar任务到Storm集群当中运行(可以从Supervisor节点提交,但是会汇总到nimbus的/tmp/storm目录下, 图2,图3):
$ ./bin/storm jar /root/chy/software/WrodCount2.jar ah.szxy.storm.test.TestWc wc
## 观察关闭一个supervisor后,nimbus的重新调度
## 再次启动一个新的supervisor后,观察,并rebalance, 可以通过图形化页面来操作
注意: 在打包前, 修改了主类的相关代码 , 设置了相关的进程和线程数, 以及worker的数目
public class TestWc {
/**
* 建立拓扑结构, 加入集群中运行
* @param args 命令行参数
*/
public static void main(String[] args) {
//创建拓扑结构
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("WcSpout", new WcSpout(),2);
tb.setBolt("WcSplitBolt",new WcSplitBolt(),4).shuffleGrouping("WcSpout");
//fieldsGrouping:根据单词属性名称进行分组
tb.setBolt("WcCountBolt", new WcCountBolt(),2).setNumTasks(4).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
Config config = new Config();
config.setNumWorkers(2);
if (args.length>0) {
try {
StormSubmitter.submitTopology(args[0], config, tb.createTopology());
} catch (Exception e) {
}
}else {
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", config, tb.createTopology());
}
}
}
图1
图2
图3
Worker – 进程
Executor – 线程
Task
设置Worker进程数
Config.setNumWorkers(int workers)
设置Executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
:其中, parallelism_hint即为executor线程数
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val)
例:
Config conf = new Config() ;
conf.setNumWorkers(2);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new MySpout(), 1);
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout);
复杂情况下的配置图与代码截图
该图5进程6任务的原因是: 有一个进程分配了两个任务(GreenBolt)
配置图
代码截图
因为有两个worker, 因此进程数是原来的两倍, 可知原来进程为5个
Rebalance – 再平衡
即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
支持两种调整方式:
1、通过Storm UI
2、通过Storm CLI
通过Storm CLI动态调整:
例:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
可以通过 help rebalance
将mytopology拓扑worker进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个
“ yellow-bolt ”所使用的线程数量调整为10个
Worker进程间的数据通信
Worker内部的数据通信
1、集群节点宕机
2、进程挂掉
3、消息的完整性
Acker – 消息完整性的实现机制
DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。
(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
Drpc 流程介绍
定义DRPC拓扑:
1、本地模式
2、远程模式(集群模式)
# 1. 修改配置文件conf/storm.yaml(指定为当前主节点nimbus即可)
----------将该更改分发到集群的其他节点-----------------
drpc.servers:
- "node2“
----------------------------------------------------
# 2. 启动DRPC Server
bin/storm drpc &
# 3. 通过StormSubmitter.submitTopology提交拓扑
事务性拓扑(Transactional Topologies):保证消息(tuple)被且仅被处理一次 官网介绍
强顺序流(强有序)
两种情况:
1、当前transaction id与数据库中的transaction id不一致( 表示新的事务, 往里面存)
2、两个transaction id相同( 覆盖或者让新的变量指向原来的数据库)
缺点:
一次只能处理一个tuple,无法实现分布式计算
强顺序的Batch流
一个关键的认识是,并非所有处理批处理元组的工作都需要有序地进行。例如,在计算全局计数时,计算分为两个部分:
#2的计算需要在批之间进行严格排序,但是没有理由您不应该通过为多个批并行计算#1 来流水线化批的计算。因此,当批次1正在更新数据库时,批次2至10可以计算其部分计数。
Storm通过将批处理的计算分为两个阶段来实现这一区别:
这两个阶段一起称为“交易”。在给定的时刻,许多批次可以处于处理阶段,但是只有一个批次可以处于提交阶段。如果批处理或提交阶段发生任何故障,则将重播整个事务(两个阶段)。
Design details(设计细节)
三种事务:
前提: 安装了Flume,Kafka,以及Storm Flume介绍以及安装 Kafka介绍以及安装
该过程实现了数据的清洗
1.启动zk集群,kafka集群,flume
启动zk
zkServer.sh start
启动kafka
kafka-server-start.sh /opt/kafka/config/server.properties
启动flume( flume-kafka.conf为flume的启动脚本,见本人Kafka博文介绍第三章 )
flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console
2.启动kafka的消费者端进程
监听testflume 数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume
监听LogError数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic LogError
3.运行代码测试
a.运行RpcClientDemo , 查看testflume监听的数据流转情况(图1)
/**
* Flume官网案例
* http://flume.apache.org/FlumeDeveloperGuide.html
* @author root
*/
public class RpcClientDemo {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("node2", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i = 100; i < 150; i++) {
String sampleData = "Hello Flume!ERROR" + i;
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
b.运行LogFilterTopology ,过滤数据,并将数据发送给kafka集群中的 LogError主题,效果如图2
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class LogFilterTopology {
public static class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
System.err.println("Accept: " + line);
// 包含ERROR的行留下
if (line.contains("ERROR")) {
System.err.println("Filter: " + line);
collector.emit(new Values(line));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
declarer.declare(new Fields("message"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// https://github.com/apache/storm/tree/master/external/storm-kafka
// config kafka spout,话题
String topic = "testflume";
ZkHosts zkHosts = new ZkHosts("node2:2181,node3:2181,node4:2181");
// /MyKafka,偏移量offset的根目录,记录队列取到了哪里
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
// 是否从头开始消费
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 1000;
// StringScheme将字节流转解码成某种编码的字符串
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3);
// set bolt
builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
// 数据写出
// set kafka bolt
// withTopicSelector使用缺省的选择器指定写入的topic: LogError
// withTupleToKafkaMapper tuple==>kafka的key和message
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
Config conf = new Config();
// set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "node2:9092,node3:9092,node4:9092");
/**
* Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
* 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
* 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
*/
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node2", "node3", "node4" }));
// 本地方式运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
c.修改 RpcClientDemo 中的循环语句,验证 FilterBolt是否起到了过滤的作用
查看testflume, 图3; 查看LogError, 图4
可以看到数据流转到了testflume主题, 而没有流转到LogError,由此可以看出 FilterBolt起到了过滤的作用
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i = 200; i < 250; i++) {
String sampleData = "Hello Flume!" + i;
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
}
图1
图2
图3
如果自己想应聘大公司, 一定要去别人技术分享网站看一看,就像美团技术团队官网
链接:https://pan.baidu.com/s/1wu9qYQZPxkqOdiY5QGR2cg
点赞私聊获取资料~~~
提取码:m8kh