定义
基于:消息推模式(驱动方式)、分布式(物理结构)、流(逻辑结构)、实时(性能特点)的计算引擎(本质属性)。
本质:消息队列 + 分布式进程。
抽象
(1) Nimbus:任务管理、监视。
(2) Supervisor:启动/关闭工作进程Worker,并监听任务。
(3) Topology:封装一个业务逻辑。是更高层次的抽象,节点表示一个Spout或Bolt,边表示Bolt订阅了哪些流(Stream)。(想起了布尔逻辑的或与非,类似CPU计算逻辑单元,能够从与或非逻辑角度刻画现实的大部分问题,微观内核逻辑会对宏观表象逻辑产生深远影响)
(4) Stream:消息元组流,是一个没有边界的tuple序列。流(Stream)可以理解为消息的渠道,每种类型的消息可以用一个流来表示。
(5) Tuple:消息元组,Topology处理的最小消息单位是Tuple(元组),它是一个Object的数组。数组中的每个value对象都有一个field,并且该value是可序列化的。
(6) Spout:是高频数据流的源头,负责发出原始Tuple。
(7) Bolt:可以随意订阅某个Spout或Bolt发出的Tuple,只要将这个流导向该Bolt。Spout和Bolt都统称为component(组件)。
(8) Worker:一个Worker就是一个JVM进程。Worker之间通过Netty传送数据,原来为ZMQ。
(9) Executor:一个Executor就是一个线程。Executor只对应单独的某个Spout或Bolt。它会生成若干该Spout或Bolt的实例,被称为Task。
(10) Task:每一个Spout和Bolt都会被当做很多task在整个集群里面运行,每一个task就是一个Spout或Bolt的实例,且对应到一个Executor线程。Topology的Task数量是固定的(由程序一开始的并行度设置决定的),但是可以动态调整线程数量,然后通过负载均衡,多的Task就分配到了空闲线程上。并且通过负载均衡,Storm尽可能的将任务平均分配到进程、线程中去。
(11) Stream groupings:消息分发策略,定义一个Stream应该如何分配给Bolt们。
(12) _acker:每个worker进程都会有一个_acker线程。
再次奉上此图:
Spark | Storm | |
---|---|---|
节点 | worker | supervisor、nimbus |
进程 | executor | worker |
线程 | core | executor |
任务 | task(对应spout/bolt实例) | task(对应spout/bolt实例) |
结构
组件结构
(1) 集群组件分布结构
(2) 节点内部组件关系
Nimbus(发布序列化任务实例) - Supervisor(启动Worker) - Worker(特定Topology进程) - Executor(特定Task线程) - Task(Spout/Bolt实例)
不会出现一个worker进程为多个topology服务。因此,一个运行中的topology就是由集群中若干台物理机上的多个worker进程组成的。
executor是worker进程启动的一个单独线程。每个executor只会运行1个topology的某个特定的spout或bolt的若干个实例,称作task。注意,storm默认是1个spout或bolt只能生成1个task,executor线程会在每次循环里顺序调用所有它包含的task实例。
task最终运行的是spout或bolt中代码的执行单元,一个task即为spout或bolt的一个实例,executor线程在执行期间会调用该task的nextTuple或excute方法。topology启动后,一个spout或bolt的task数目是不变的,但该spout或bolt使用的executor线程数是可以动态调整的。
默认情况下,一个supervisor节点最多可以启动4个worker进程,每一个topology默认占用1个worker进程,每个spout或者bolt任务会占用1个executor线程,每个executor启动1个task。
并行度
根据业务调整并行度
图中是一个包含有两个worker进程的拓扑。其中,蓝色的 BlueSpout 有两个executor,每个 executor 中有一个 task,并行度为 2;绿色的 GreenBolt 有两个 executor,每个 executor 有两个 task,并行度也为2;而黄色的YellowBolt 有 6 个 executor,每个 executor 中有一个 task,并行度为 6,因此,这个拓扑的总并行度就是 2 + 2 + 6 = 10。具体分配到每个 worker 就有 10 / 2 = 5个executor。
Storm可以随时增加或者减少worker或者executor的数量,而不需要重启集群或者拓扑。具体方式有:CLI、Storm UI,修改后会注销掉topology,并rebalance所有任务。
不建议为每个拓扑在每台机器上分配超过一个worker。而应该改为在一台机器上分配多个线程,而不是在一台分配多个进程来提高并行度。
深刻理解Storm的线程进程
在一个worker中的线程是运行在这个worker的JVM上的,所以生成的静态变量、class对象等是同一个。
例如,在Topology中定义一个静态变量,初始化一个对象。那么,在不同worker中打印这个对象的hashCode,是不同的;但是,在同一个worker中的executor打印这个对象的hashCode是相同的。这就要求我们慎用状态,因为Storm本来就是无状态编程范式,即使使用也要考虑清楚,是否需要worker级别的全局唯一,是加在组件的初始化方法里还是prepare、open里。
消息分发
(1) Shuffle Grouping:随机分组。随机派发stream里的tuple,保证bolt中的每个任务接收到的tuple数目基本均衡(能较好的实现负载均衡)
(2) Fields Grouping:按字段分组。比如按userid来分组,具有同样userid的tuple会被分到同一个任务,而不同userid的tuple会被分到不同的任务。
(3) All Grouping:广播发送。对于每一个tuple,bolt中的所有任务都会收到
(4) Global Grouping:全局分组。这个tuple被分配到storm中的一个bolt的其中一个task,在具体一点就是分配给id值最低的那个task,收集全部bolt的中间计算结果,最后进行聚合时用
两个逻辑
(1)
supervisor(Host)à
Worker(Process)à
Executor(Thread)à
Task(Spout/Bolt实例)
(2)
Stream à Tuple à List<field,value>
流程
总体流程
ZK集群内部有自己的通信机制,Storm借助其通讯机制,例如,任务下发等。在执行一个任务的时候,storm会把任务及相关执行的代码经过序列化之后发送到ZK节点供supervisor去下载,然后才会各自执行自己部分的代码或者任务。每个ZK节点收到的任务是一样的,而supervisor只需要下载属于自己的任务即可。
关于spout/bolt的生命周期,一般来说spout/bolt的生命周期如下:
(1) 在提交了一个topology之后(在nimbus所在的机器),创建spout/bolt实例并进行序列化;
(2) 将序列化的component发送给所有的任务所在的机器;
(3) 在每一个任务上反序列化component;
(4) 在开始执行任务之前,先执行component的初始化方法(spout是open,bolt是prepare);
(5) 因此component的初始化操作应该在prepare/open方法中进行,而不是在实例化component的时候进行。
元数据存储结构
用ZooKeeper来存储组件之间共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复。因此Storm的模块是无状态的,这是保证其可靠性及伸缩性的基础。
树中的每一个节点代表ZooKeeper中的一个节点(znode),每一个叶子节点是Storm真正存储数据的地方。从根节点到叶子节点的全路径代表了该数据在ZooKeeper中的存储路径,该路径可被用来写入或获取数据。Storm zookeeper目录树含义:
(1) /storm/workerbeats/<topology-id>/node-port:(小组项目汇报书、工人工作汇报书)
它存储由node-port指定的Worker的运行状态和一些统计信息,主要包括Worker上所有Executor的统计信息(如发送/接收的消息数)、Worker的启动时间以及最后一次更新这些信息的时间。它的内容在运行过程中不断更新。
(2) /storm/storms/<topology-id>:(项目计划书)
存储Topology本身的信息,包括名字、启动时间、运行状态、要使用的Worker数目以及每个组件的并行度设置。它的内容在运行过程中不变。
(3) /storm/assignments/<topology-id>:(项目任务分配书)
存储Nimbus为每个Topology分配的任务信息,包括该Topology在Nimbus机器本地的存储目录、被分配到的Supervisor机器到主机名的映射关系、每个Executor运行在哪个Worker上以及每个Executor的启动时间。该节点的数据在运行过程中会被更新。
(4) /storm/supervisors/<supervisor-id>:(部门的人员架构图)
它存储Supervisor机器本身的运行统计信息,主要包括最近一次更新时间、主机名、supervisor-id、已经使用的端口列表、所有的端口列表以及运行时间。该节点的数据在运行过程中也会被更新。
(5) /storm/errors/<topology-id>/<component-id>/e<sequential-id>:
它存储运行过程中每个组件上发生的错误信息。<sequential-id>是一个递增的序列号,每一个组件最多只会保留最近的10条错误信息。它的内容在运行过程中是不变的(但是有可能被删除)。
算法流程
1. Nimbus
箭头1表示由Nimbus创建的路径:
(1) /storm/workerbeats/<topology-id>
(2) /storm/storms/<topology-id>
(3) /storm/assigments/<topology-id>
其中对于路径a,Nimbus只会创建路径,不会设置数据,数据是由Worker设置的。对于路径b和c,Nimbus在创建它们的时候就会设置数据。a和b只有在提交新Topology的时候才会创建,且b中的数据设置好后就不再变化,c则在第一次为该Topology进行任务分配的时候创建,若任务分配计划有变,Nimbus就会更新它的内容。
箭头2表示Nimbus需要获取数据的路径:
(1) /storm/workerbeats/<topology-id>/node-port
(2) /storm/supervisors/<supervisor-id>
(3) /storm/errors/<topology-id>/<component-id>/e<sequential-id>
Nimbus需要从路径a读取当前已被分配的Worker的运行状态。根据该信息,Nimbus可以得知哪些Worker状态正常,哪些需要被重新调度,同时还会获取到该Worker所有Executor统计信息,这些信息会通过UI呈现给用户。从路径b可以获取当前集群中所有Supervisor的状态,通过这些信息可以得知哪些Supervisor上还有空闲的资源可用,哪些Supervisor则已经不再活跃,需要将分配到它的任务分配到其他节点上。从路径c上可以获取当前所有的错误信息并通过UI呈现给用户。
集群可动态增减机器,这会引起ZooKeeper中元数据的变化,Nimbus通过不断获取这些元数据信息来调整任务分配,故Storm具有良好的可伸缩性。当Nimbus死掉时,其他节点是可以继续工作的,但是不能提交新的Topology,也不能重新进行任务分配和负载调整,因此目前Nimbus还是存在单点的问题。随后Storm可配多个Nimbus,就不存在单节点问题了。
2. Supervisor
同Nimbus类似,Supervisor也要通过ZooKeeper来创建和获取元数据。除此之外,Supervisor还通过监控指定的本地文件来检测由它启动的所有Worker的运行状态。
箭头3表示Supervisor在ZooKeeper中创建的路径是/storm/supervisors/<supervisor-id>。新节点加人时,会在该路径下创建一个节点。值得注意的是,该节点是一个临时节点(创建ZooKeeper节点的一种模式),即只要Supervisor与ZooKeeper的连接稳定存在,该节点就一直存在;一旦连接断开,该节点则会被自动删除。该目录下的节点列表代表了目前活跃的机器。这保证了Nimbus能及时得知当前集群中机器的状态,这是Nimbus可以进行任务分配的基础,也是Storm具有容错性以及可伸缩性的基础。
箭头4表示Supervisor需要获取数据的路径是/storm/assigments/<topology-id>。我们知道它是Nimbus对Topology的任务分配信息,Supervisor从该路径可以获取到Nimbus分配给它的所有任务。Supervisor在本地保存上次的分配信息,对比这两部分信息可以得知分配信息是否有变化。若发生变化,则需要关闭被移除任务所对应的Worker,并启动新的Worker执行新分配的任务。Nimbus会尽量保持任务分配的稳定性。
箭头9表示Supervisor会从LocalState中获取由它启动的所有Worker的心跳信息。Supervisor会每隔一段时间检査一次这些心跳信息,如果发现某个Worker在这段时间内没有更新心跳信息,表明该Worker当前的运行状态出了问题。这时Supervisor就会杀掉这个Worker,原本分配给这个Worker的任务也会被Nimbus重新分配。
3. Worker
Worker也需要利用ZooKeeper来创建和获取元数据,同时它还需要利用本地的文件来记录自己的心跳信息。
箭头5表示Worker在ZooKeeper中创建的路径是/storm/workerbeats/<topology-id>/node-port。在Worker启动时,将创建一个与其对应的节点,相当于对自身进行注册。需要注意的是,Nimbus在Topology被提交时只会创建路径/storm/workerbeats/<topology-id>,而不会设置数据,数据则留到Worker启动之后由Worker创建。这样安排的目的之一是为了避免多个Worker同时创建路径时所导致的冲突。
箭头6表示Worker需要获取数据的路径是/storm/assignments/<topology-id>,Worker会从这些任务分配信息中取出分配给它的任务并执行。
箭头8表示Worker在LocalState中保存心跳信息。LocalState实际上将这些信息保存在本地文件中,Worker用这些信息跟Supervisor保持心跳,每隔几秒钟需要更新一次心跳信息。Worker与Supervisor属于不同的进程,因而Storm采用本地文件的方式来传递心跳。(进程间通信:共享文件)
4. Executor
箭头7表示Executor在ZooKeeper中创建的路径是/storm/errors/<topology-id>/<component-id>/e<sequential-id>。每个Executor会在运行过程中记录发生的错误。
5. 总结
(1) Nimbus感知Supervisor:通过/storm/supervisors/<supervisor-id>路径对应的数据进行心跳保持。Supervisor创建这个路径时釆用的是临时节点模式,所以只要Supervisor死掉,对应路径的数据就会被删掉,Nimbus就会将原本分配给该Supervisor的任务重新分配。
(2) Nimbus感知Worker:之间通过/storm/workerbeats/<topology-id>/node-port中的数据进行心跳保持。Nimbus会每隔一定时间获取该路径下的数据,同时Nimbus还会在它的内存中保存上一次的信息。如果发现某个Worker的心跳信息有一段时间没更新,就认为该Worker已经死掉了,Nimbus会对任务进行重新分配,将分配至该Worker的任务分配给其他Worker。
(3) Supervisor感知Worker:之间通过本地文件(基于LocalState ) 进行心跳保持。
特点
(1) 性能:并行程度高,基于消息,很少磁盘读写,处理速度快。
(2) 可靠性:依靠组件无状态,状态信息(元信息)保存在zookeeper上这种机制来保证。
(3) 可伸缩性:线性增加资源来提高性能(分布式系统的特点)。Storm的模块是无状态的,这是保证其可靠性及可伸缩性的基础。
(4) 快速失败,无状态:Storm的两种组件Nimbus和Supervisor都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper。计算单元的依赖的数据全部在接收的消息中可以找到。
(5) 可扩展性:并行编程框架,思路清晰,业务代码容易扩展。消息分组方式是可扩展性的基础。
(6) 基本性质:Storm是一种计算引擎,Hadoop是一种大数据平台,包含计算引擎和存储系统。
(7) 数据来源:Hadoop处理的是HDFS上TB级别的数据(历史数据);Storm处理的是实时新增的某一笔数据(实时数据);
(8) 处理过程:Hadoop是批处理,分Map阶段到Reduce阶段;Storm是用户定义的流处理,流程中每个步骤可以是数据源(Spout)或处理逻辑(Bolt);
(9) 是否结束:Hadoop的Job执行完毕后结束;Storm的Topology没有结束状态。
(10) 无数据丢失:Storm创新性提出的ACK消息追踪框架。