很多粉丝在问如何分析定位spark streaming性能瓶颈。
貌似以前,浪尖发过一篇文章,讲的是从spark streaming的web ui的角度去分析。这其实,是根据现象去分析定位问题的很方便的手段,大家可以去翻翻,星球的球友也可以去精华帖子里看看。
今天,主要是从数据流的角度去分析问题,做大数据,其实想要最大限度的优化你的应用,就要求自己对数据流在各个环节流动的情况,做到了如执掌。
我这里为了方便讲解,画了一个简单的流程图!
架构图
1.生产者->topic
生产者发送消息到kafka的topic,topic往往有很多分区,那么每条消息该发往哪个分区呢?
a.指定分区生产。消息就会落到kafka topic的指定分区。
b.指定消息的key。客户端会对key,按照指定的分区器(默认是key的hash值对topic分区数取余确定分区)。当然,生产中往往需要自定义分区器,除非你能确保key是均匀分布的。key不均匀会导致topic的分区间消息不均衡,不利于后面消费者消费处理。生产者在生产中往往会使用随机分区器或者轮训分区器,尽量使得发往topic数据均匀。
c.不指定key。就是随机送往topic的分区,数据大致均匀。
不知道你是否能了解这块数据是否均匀?如何去定量了解呢?
除了在生产者客户端加统计数据,还有什么方式吗?
要确保生产者发往的topic分区数据尽量均匀哦!
2.kafkardd
现在基本上都是使用spark streaming的direct stream api,这种api会按照批次生成kafkardd,kafkardd的每个分区内有个消费者,消费一定范围的offset。这个我在星球分享过源码视频了!
那么,kafka topic的每个分区是否均匀,就决定着spark streaming生成kafkardd的每个分区的数据是否均匀,也就决定着第一个stage的task处理的数据是否均匀,不均匀就是数据倾斜,会导致task执行时间快慢不均匀,机器计算利用不合理,任务性能不佳。
那么,假如topic数据已经不均匀如何做呢?
repartition+去掉数据本地性,可以稍微优化。原理自己可以想想,浪尖也分享过。
还是尽量保正生产者发生数据尽量均匀。
3.shuffle
浪尖一般不用窗口的话,会把逻辑写到foreachrdd内,这样同一段代码,离线/实时都能用。
shuffle算子我们就分为groupbykey/reducebykey这类。key不均匀对于spark streaming来说数据量一般很少,所以多给点内存就好了。假如确实数据量大,可以采样key,倾斜的key单独处理,然后也可以老套路,加后缀,分层多次聚合。
spark sql的group by也是类似。
distinct算子也是基于reducebykey实现的。
join 在spark streaming都是小表,做好缓存即可。
与外部存储,比如redis或者alluxio的join稍微有点麻烦。
思路也是一样:
a.小表广播,缓存。也要看表的使用频率,别临时表就用了一次,你还在那缓存/广播,那就不值得了。
b.严重倾斜的key单独处理。
c.稍微倾斜的key,多给点内存。
总之,要权衡每个操作的代价,要了解数据的特征,和数据在spark dag内如何流动的。
这些都是要结合你自己的数据特征多次测试,多多观察。
shuffle的文章可以看前几天发的文章:
https://mp.weixin.qq.com/s/EC3zUgmFnNmwIu2vUMyYew
那么,有人问shuffle的数据倾斜能不能通过增加并行度来解决呢?
看情形,假如是单个key特大引起的,那么增加并行度不行。否则可以。
不确定的话,可以尝试增加分区试一下。
4.消息顺序性
spark streaming+kafka不适合处理顺序性的消息。
有些场景勉强可以实现,比如同一个用户会话,发生的行为数据,想要顺序处理,那么可以加时间戳或者递增唯一id。然后spark streaming处理的时候,再排序即可,当然可能需要借助第三方存储来存储中间状态。
假如是全局一致性,还是要有全局递增id,时间戳都不行。
5.数据仅一次处理
spark streaming+kafka大部分用户实现的也都是至少一次处理。
假如只在foreach/foreachpartition中与外部系统交互,或者中间算子只读外部系统数据。那么,这种情况下保证结果输出仅一次,有几种种方法:
a.保证仅一次输出,也即是repartition成1,而且以一次事务的方式完成写操作。
b.外部存储系统支持密等性。比如,利用redis/hbase。
c.用户自己去重,或者本身重复处理也不影响结果。
d.offset+结果一次事务输出。看业务特点,大部分业务比较难实现。
e.delta lake,支持acid事务,可以认为是spark一个存储层,其实事务信息和数据写到同一个目录,支持数据格式是parquet,但是spark版本要求2.4+。
假如,在过程算子,如map中与外部交互,且是更新操作,那就无法保证仅一次更新了。需要在存储端保证密等性或者读的时候来去重判断。