上次给大家讲了Spark local模式的启动安装和使用,现在给大家分享一下Standalone模式下的使用和安装。这个讲完以后,还有yarn和mesos下集群的安装和使用。
Spark on local Cluster伪分布式
即Spark Standalone模式。此时Spark会使用Standalone的集群管理器(Cluster Manager)启动Spark。
这种模式,也可以称为Spark的伪分布式。
Standalone集群管理器是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclipse、IDEA等开发平台上使用new SparkConf.setManager(“spark://master:7077”)方式运行Spark任务时,Driver是运行在本地Client端上的。
使用$SPARK_HOME/sbin下的start-all.sh可以启动集群,使用stop-all.sh可以停止集群。
我们可以在一台机器上模拟集群,也可以在多台机上上运行Spark Standalone集群。如果是在多台机器上,请保证Master(哪一台调用start-master.sh哪一台就是master)向worker节点的SSH免密码登录。(关于如何实现SSH免密码登录,请查看LInux相关教程)。同时,需要说明的是,如果Worker和master在同一台主机上,也必须要配置SSH向自己的免密码登录。
为了便于学习,我们先在一台机器上启动Spark Standalone模式。
1、单机Standalone步1:下载、解压Spark
请参考之前的步骤。
步2:配置Spark环境变量
可选。本人配置环境变量,一般习惯于创建一个独立的环境变量文件如spark.sh放到/etc/profile.d/目录下。
export SPARK_HOME=/spark/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
生效:
$ source /etc/profile
步3:配置slaves
在$SPARK_HOME/conf目录下,将slaves.template重命名为slaves。配置只有一个Worker节点。
#配置一个主机名称,或是localhost或是ip地址。建议使用主机名称
hadoop201
步4:启动Spark集群
在$SPARK_HOME/sbin目录下,拥有启动和停止Spark集群的脚本:
start-slave.sh stop-master.sh stop-slaves.sh
start-master.sh start-slaves.sh stop-all.sh start-all.sh stop-slave.sh ....
使用start-all.sh即可以启动spark集群。
[wangjian@hadoop201 sbin]$ start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.master.Master-1-hadoop201.out
hadoop201: starting org.apache.spark.deploy.worker.Worker, logging to /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.worker.Worker-1-hadoop201.out
启动以后,通过jps会发现两个进程:Master和Worker。
[wangjian@hadoop201 sbin]$ jps
1206 Worker
1146 Master
1276 Jps
步5:访问MasterUI
在启动过程中,master会将启动过程的日志输出到mater-1-host.out文件中去,现在打开这个文件查看启动日志:
1.[wangjian@hadoop201 sbin]$ cat /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.master.Master-1-hadoop201.out
2.Spark Command: /usr/jdk1.8.0_144/bin/java -cp /spark/spark/conf/:/spark/spark/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host hadoop201 --port 7077 --webui-port 8080
3.========================================
4.Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
5.17/12/08 02:16:06 INFO Master: Started daemon with process name: 1146@hadoop201
6.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for TERM
7.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for HUP
8.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for INT
9.17/12/08 02:16:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10.17/12/08 02:16:07 INFO SecurityManager: Changing view acls to: wangjian
11.17/12/08 02:16:07 INFO SecurityManager: Changing modify acls to: wangjian
12.17/12/08 02:16:07 INFO SecurityManager: Changing view acls groups to:
13.17/12/08 02:16:07 INFO SecurityManager: Changing modify acls groups to:
14.17/12/08 02:16:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(wangjian); groups with view permissions: Set(); users with modify permissions: Set(wangjian); groups with modify permissions: Set()
15.17/12/08 02:16:08 INFO Utils: Successfully started service 'sparkMaster' on port 7077.
16.17/12/08 02:16:09 INFO Master: Starting Spark master at spark://hadoop201:7077
17.17/12/08 02:16:09 INFO Master: Running Spark version 2.1.2
18.17/12/08 02:16:10 INFO Utils: Successfully started service 'MasterUI' on port 8080.
19.17/12/08 02:16:10 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://192.168.56.201:8080
20.17/12/08 02:16:10 INFO Utils: Successfully started service on port 6066.
21.17/12/08 02:16:10 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
22.17/12/08 02:16:11 INFO Master: I have been elected leader! New state: ALIVE
23.17/12/08 02:16:13 INFO Master: Registering worker 192.168.56.201:43874 with 1 cores, 1024.0 MB RAM
通过上面的日志,分析得到以下信息:
1:行2,可以知启动Spart就是通过java命令,启动一个Java类,这个类叫Master,且指定了最大使用的内存。及端口号。
2:行16,可知spark的端口号为7077。
3:行18,可知WebUI的端口号为8080.
4:行20,可知启动一个REST的service在6066端口。
5:行23,可知worker所使用的端口及服务器地址。
有兴趣的朋友,可以再去查看worker的日志文件,从中你会知道worker节点的端口为8081。
现在访问8080的WebUI:
步6:开启一个RDD
开启一个RDD会启动所有Worker上的Executor即:CoarseGrainedExecutorBackend。
现在我们可以执行一个spark-shell连接这个集群。通过输出以下的命令:
$ spark-shell - -master spark://hadoop201:7077
[wangjian@hadoop201 spark]$ spark-shell --master spark://hadoop201:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/08 02:36:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.56.201:4040
Spark context available as 'sc' (master = spark://hadoop201:7077, app id = app-20171208023624-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.
启动以后,检查8080 WebUI你会了发现多了一个Application:
使用jps检查进程:
[wangjian@hadoop201 ~]$ jps
1472 SparkSubmit会在master节点上,启动一个SparkSubmit用于处理分配任务
1555 CoarseGrainedExecutorBackend #启动所有worker节点上的executor后台进程
1587 Jps
1206 Worker
1146 Master
再次加载一个本地或是hdfs上的文件,进行行统计,你会发现有一个计算的过程如下:
由于是集群运算,所以,会显示一个进度。
进行单词统计的示例:
scala> var rdd2 = sc.textFile("file:///spark/a.txt");
rdd2: org.apache.spark.rdd.RDD[String] = file:///spark/a.txt MapPartitionsRDD[6] at textFile at :24
scala> rdd2.flatMap(str=>str.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_).collect().foreach(println);
(Hello,3)
(Alex,1)
(Mary,1)
(Jack,1)
或是使用以下表达式:
scala> rdd2.flatMap(str=>str.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_).collect().foreach(kv=>(println(kv._1+","+kv._2)));
Hello,3
Alex,1
Mary,1
Jack,1
如果太长的话,回回车也是可以的:四车之前先输入.(点)
scala> rdd2.flatMap(str=>str.split("\\s+")).
map(word=>(word,1)).
reduceByKey(_+_).
collect().
foreach(kv=>(println(kv._1+" "+kv._2)));
Hello 3
Alex 1
Mary 1
Jack 1
步7:代码连接集群
通过Java或是通过Scala代码连接集群,只需要设置master为spart://ip:7077即可。
建议使用spark-submit方式来执行,在foreach中输出的数据会输出到stdout中。
代码:
packagecn.wang
importorg.apache.spark.rdd.RDD
importorg.apache.spark.
/**
*使用submit方式提交到集群
*读取完成文件以后,保存到stdout中
*/
objectSpark05_Standalone {
defmain(args: Array[String]): Unit = {
if(args.length ==) {
println("请输入参数");
return;
}
valpath = args.apply();
//声明config
valconf: SparkConf =newSparkConf();
conf.setAppName("Standalone");
varsc: SparkContext =newSparkContext(conf);
valtextFile: RDD[String] = sc.textFile(path);
vallineCount: Long = textFile.count();
println("行数据:"+ lineCount);
//单词统计
textFile.flatMap(_.split("\\s+"))//
.map((_,1)).reduceByKey(_ + _)
.collect()
.foreach(kv => {
println("结果:"+ kv._1 +","+ kv._2);
});
sc.stop();
}
}
开发一个Shell脚本文件,且添加执行权限,假设文件名称为:submit.sh
#!/bin/bash
if [ $# -eq 0 ]; then
echo "请输入读取的文件"
else
spark-submit \
--class cn.wang.Spark05_Standalone \
--master spark://192.168.56.201:7077 \
SparkDemo01.jar $1
fi
现在,就可以通过向shell脚本传递不同的文件方式,来执行这个程序:
如:
读取本地文件请执行:
$ submit.shfile:///spark/a.txt
读取hdfs上的文件:
$ submit.sh hdfs://hadoop201:8020/wangjian/a.txt
上面的脚本执行以下,都可以输出正确的结果:
结果:Hello,3
结果:Alex,1
结果:Mary,1
结果:Jack,1
当然也可以将结果,保存到HDFS上。
2、集群Standalone模式
Spark集群非常简单,只要修改$SPARK_HOME/conf目录下的slaves文件即可。建议在所有节点相同的目录下,安装Spark,同时配置环境变量。
步1:修改slaves文件
将spark_home目录下的的slavles.template文件,重命名为:slaves。并添加worker节点:
#this is master node of Spark
hadoop101
#this is slave Node of Spark
hadoop102
步2:配置ssh免密码登录
哪台机器上执行start-all.sh/start-master.sh即哪一台为master主机,将拥有master节。所有配置到slaves中的节点,都是worker节点。所以,需要配置从master到worker的免密码登录。
在master节点上执行:
$ ssh-keygen -t rsa
$ ssh-copy-id hadoop102
步3:scp拷贝文件
使用scp -r将文件拷贝到其他节点。
步4:使用start-all.sh启动集群
$ ./spark/sbin/start-all.sh
[wangjian@hadoop101 spark]$ jps
1204 Jps
1157 Worker
1086 Master
检查所在节点上的Worker是否已经启动。
步5:查看WebUI界面
通过http://yourMasterIp:8080查看Spark:
步6:开启一个Driver
每开启一个Driver在集群的环境下,所有的worker节点上的Executor都会启动,同时会在Master节点 上,开启SparkSubmit进程,如:
$ spart-shell --master spark://192.168.56.101:7077
然后检查Master节点上的进程为:
[wangjian@hadoop101 ~]$ jps
1157 Worker #因为当前节点也是worker所以拥有一个Worker进程
1382 Jps
1243 SparkSubmit #主节点,即Master会拥有一个SparkSubmit进程
1086 Master
1311 CoarseGrainedExecutorBackend #当启动一个Driver时,每一个Worker节点启动的进程
检查其他Worker节点
[wangjian@hadoop102 spark]$ jps
1104 Worker #worker节点的进程
1216 Jps
1165 CoarseGrainedExecutorBackend #当启动一个Driver每一个Worker节点启动的进程
步7:提交任务
注意,由于目前已经是在集群的环境下,所以,如果要读取本地文件,应该保证在所有节点的相同目录下,都拥有此文件。在这种情况下,读取hdfs中的文件,就变得比较方便。
现在我们启动hadoop集群,来测试spark:
1:使用Spark-Shell进行测试
scala> val textFile = sc.textFile("hdfs://192.168.56.101:8020/wang/a.txt");
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.101:8020/wang/a.txt MapPartitionsRDD[1] at textFile at :24
scala> textFile.count();
res0: Long = 3
scala> textFile.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).
collect().foreach(println);
(Hello,3)
(Mary,1)
(Rose,1)
(Jack,1)
设置一个输出的格式:
scala> textFile.flatMap(_.split("\\s+")).map((_,1)).
reduceByKey(_+_).
collect().
map(kv=>{ //这儿用于设置输出的格式
val key=kv._1;
val v = kv._2;
key+"\t"+v;
}).foreach(println);
Hello 3
Mary 1
Rose 1
Jack 1
如果在读取文件中,没有输入hdfs://前缀,则默认也是读取hdfs文件系统中的数据,但这一点取决于您已经配置了HADOOP_CONF_DIR在$SPARK_HOME/conf/spark-env.sh文件中,如下:
#配置指定hadoop的配置目录,以便于让Spark使用yarn
HADOOP_CONF_DIR=/hadoop/hadoop-2.7.3/etc/hadoop
以下就可以省去hdfs://前缀了:
scala> val tf = sc.textFile("/wang/a.txt");
tf: org.apache.spark.rdd.RDD[String] = /wang/a.txt MapPartitionsRDD[1] at textFile at :24
scala> tf.count();
res0: Long = 3
2:在spark-shell中操作hdfs
可以读取hdfs上的文件,也可以直接将数据保存到hdfs上:
scala> val tf = sc.textFile("/wang/a.txt"); //读取hdfs上的文件
scala> tf.count();
res0: Long = 3
scala> tf.flatMap(_.split("\\s+")).saveAsTextFile("/out002"); //保存到hdfs上
scala> tf.flatMap(_.split("\\s+")). //保存到hdfs上
map((_,1)).reduceByKey(_+_).saveAsTextFile("/out003");
scala> tf.flatMap(_.split("\\s+")). //格式化以后保存到hdfs上
map((_,1)).reduceByKey(_+_).
val str=kv._1+"\t"+kv._2;
str;
}).saveAsTextFile("/out004");
上面种方式保存到hdfs上的格式依次是:
$ hdfs dfs -cat /out002/* #没有进行统计,直接保存
Hello
Jack
Hello
Mary
Hello
Rose
$ hdfs dfs -cat /out003/* #进行统计以后保存,带有括号
(Hello,3)
(Mary,1)
(Rose,1)
(Jack,1)
$ hdfs dfs -cat /out004/* #格式化以后保存 去除的括号
Hello 3
Mary 1
Rose 1
Jack 1
3:通过spark-submit提交任务
开发的Scala代码,还和以前一样。不过,为了不让大家到处乱找,我还是给出完整的代码:
packagecn.wang
importorg.apache.spark.rdd.RDD
importorg.apache.spark.
/**
*操作Spark Standalone Cluster模式下的提交
*/
objectSpark06_Cluster {
defmain(args: Array[String]): Unit = {
if(args.length
println("请输入参数1:hdfs file,2:输出hdfs目录")
return;
}
valconf: SparkConf =newSparkConf();
conf.setAppName("Spark_Standalone_Cluster");
valsc: SparkContext =newSparkContext(conf);
//读取hdfs文件
valtf: RDD[String] = sc.textFile(args.apply());
valcount: Long = tf.count();
println("文件中的行数为:"+ count);
valdata = tf.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_ + _);
//输出到控制台-将会输出到stdout文件日志文件中去
data.map(kv => {
valstr = kv._1 +"\t"+ kv._2;
str;
}).foreach(println);
//输出到hdfs
data.map(kv => {
valstr = kv._1 +""+ kv._2;
str;
}).saveAsTextFile(args.apply(1));
println("执行完毕")
sc.stop();
}
}
开发一个脚本(可选)并输入以下命令:
[wangjian@hadoop101 sparkjar]$ cat submit01.sh
#!/bin/bash
if [ $# -lt 2 ]; then
echo "参数1 hdfs地址,参数2输出的hdfs地址"
else
spark-submit \
--class cn.wang.Spark06_Cluster \
--master spark://192.168.56.101:7077 \
SparkDemo01.jar $1 $2
fi
启动这个脚本文件,检查hdfs文件系统上的输出:
$ hdfs dfs -cat /out001/*
Hello3
Mary1
Rose1
Jack1
1、小结
1:只要使用spark的start-all.sh启动的standalone集群,无论多少个worker操作方式都一样。
2:standalone模式下master的地址为:spark://ip:7077。
3:在开中,大量使用spart-submit方式提交,以便于真实环境的测试。
4:无论是何种模式,都可以操作hdfs上的文件,只要能访问到,都可以通过saveAsTextFile的方式保存数据到hdfs上。
5:在多个worker即cluster模式下,多个worker输出的数据不会在控制台出现,而是会出现在stdout的日志文件中。
领取专属 10元无门槛券
私享最新 技术干货