前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >行业客户现场SparkStreaming实时计算使用案例问题总结

行业客户现场SparkStreaming实时计算使用案例问题总结

作者头像
用户9421738
发布2024-09-12 19:26:48
1390
发布2024-09-12 19:26:48
举报
文章被收录于专栏:大数据从业者

背景

虽然当前实时计算领域所有厂商都推荐Flink框架,但是某些传统行业客户因为多年固化的业务场景仍然坚持使用SparkStreaming框架。本文主要记录Spark概念架构、SparkStreaming性能问题处理、SparkStreaming 7*24作业在Kerberos Hadoop集群HDFS_DELEGATION_TOKEN问题处理。

Spark概念架构

Spark applications以进程集合(Executors)的方式运行在集群上,通过main方法(也称Driver程序)中的SparkContext对象管理调度Executors。集群提供Executors运行所需的资源,集群类型分为Spark standalone、Mesos、YARN、Kubernetes。

架构关键点说明:

代码语言:javascript
复制
1.每个application都有自己的Executors进程,进程以多线程的方式运行task。不同application的Driver和Executors相互隔离,如果不通过外部系统,无法共享数据。

2.application生命周期内,Driver需要与Executors通信,比如:调度task到Executors执行、接收Executors心跳、接收Executors blocks信息等等。所以,Driver与Executors建议局域网内通信。

关键名词说明:

名称

说明

Application

用户开发的Spark程序,包括Driver端和Executors端

Driver

运行main()方法、创建SparkContext等

Executor

运行tasks、保存数据在内存或磁盘

Task

Driver发送给Executors的执行单元

Job

多个Tasks组成的并行计算,由action算子生成

Stage

Job划分不同的Tasks集合为Stage,由shuffle算子生成

Spark是基于RDD进行计算的,而RDD是一种抽象的、分布式的、弹性的、能并行操作的数据集。两种创建RDD的方式:加载Driver程序内的数据集合或者加载外部数据源,如Kafka、HDFS、HBase、Hive、文件系统等等。

RDD支持两种operation类型:transformations、actions。transformations就是基于已有RDD创建新的RDD,比如map。而actions就是触发RDD的计算,将结果返回给Driver,每个action操作会生成一个Job,比如reduce。所有的transformations都是惰性的,并不会立即触发计算,只是记录相应的计算逻辑。action需要计算结果的时候才触发计算。这种设计使得Spark更加高效。

默认情况下,多次action需要对同一个RDD进行transformations操作时候,都会重新RDD的重复计算。建议使用persisit(或者cache)将RDDD持久化到内存或者磁盘,以提高多次使用的效率。

除了RDD以外,Spark中还有一个抽象是可用于并行操作的共享变量。Spark在多个Executors节点之间并行执行Tasks时候,一个变量需要在Tasks之间或者Driver与Tasks之间共享使用。Spark支持两种类型共享变量:广播变量、计数器。

SparkStreaming性能问题

数据源使用Kafka支持两种模式:KafkaReceiver、DirectKafka。从实现上来看,DirectKafka的性能更优,数据一致性更强。推荐使用DirectKafka的API实现接收器。性能调优策略其实很成熟、很有效,包括:批量Duration间隔、kafka消费速率、RDD持久化、Driver与Executor内存、并行度、外部shuffle服务等等。但是,客户疑问的现场如下:

如上图所以模拟客户线上作业的现象:为什么Output Op Duration耗时(42秒)比Job Duration耗时(4秒+3秒)很长?Output Op Duration耗时长就导致批Duration任务出现排队的现象。

那么,要解释上述现象,就要回到前面章节提到的Application名词解释,即:用户开发的Spark程序,包括Driver端和Executors端。换句话说,app程序是分为两部分的,一部分在Driver端执行、另一部分在Executor端执行。对应到UI上也就是:Oupt Op Duration是Driver端执行的耗时、Job Duration是Executor端执行的耗时。相关原理可以查看Spark源码:

代码语言:javascript
复制
org.apache.spark.streaming.scheduler. JobScheduler

而Driver端耗时较长通常原因是:程序使用了往Driver端拉取数据的算子或者shuffler数据量大或者SparkSession存在耗时较长的sql操作等等。比如:

代码语言:javascript
复制
df.collect();

sparkSession.sql("insert into a select * from b");

完整示例详见github:

代码语言:javascript
复制
https://github.com/felixzh2020/felixzh-java/blob/main/SparkStreaming/src/main/java/com/felixzh/Kafka2Hdfs.java#L60        

HDFS_DELEGATION_TOKEN问题

我们知道SparkStreaming作业属于7*24长时间运行的流作业,客户反馈说任务每7天就报错退出,异常日志提示:HDFS_DELEGATION_TOKEN is expired。

这里先简单说明下原理就是开启Kerberos认证的Hadoop集群中HDFS的namenode会生成HDFS_DELEGATION_TOKEN,同时给token设置相关生命周期管理参数。

需要访问HDFS的应用需要申请token,然后使用token才能正常操作HDFS。而token是有生命周期的,也就是说会过期。当然,这个过期是正常行为。

那么,对于流任务怎么办?先否定一种方式就是将过期时间调大,这个不现实。回到正路上来,既然token过期,那只需要在token过期之前、重新申请token不就行了。事实正是如此,常用的流计算框架Flink、SparkStreaming都是这么做的。

SparkStreaming通过HadoopDelegationTokenManager类实现周期性地登录KDC、周期性地申请delegation token。也就是在delegation token过期前,Driver会重新申请新token,然后通过IPC发送给Execuors,从而确保SparkStreaming能够长时间运行。

具体方式有两种:一种是使用keytab;一种是使用ticket cache。

很显然,推荐使用Keytab方式,就是使用spark-submit --principal --keytab 即可。当然,使用--conf spark.kerberos.keytab --conf spark.kerberos.principal效果相同。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • Spark概念架构
  • SparkStreaming性能问题
  • HDFS_DELEGATION_TOKEN问题
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档