DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。 DataFrame只知道每一列的类型是什么,每一行的类型是不知道的,不管每一行
创建SparkSession
val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()创建样例类
case class Student(id:Int,name:String,age:Int,sex:String)使用toDF必须进行隐式转换
import sparkSession.implicits._为了方便测试,单独把sparkSession 提出去,使用它 Junit的方式进行测试运行。
@Test
def demo01: Unit ={
// 数据准备
val list=List(
Student(1,"张三",18,"男"),
Student(2,"绣花",16,"女"),
Student(3,"李四",18,"男"),
Student(4,"王五",18,"男"),
Student(5,"翠花",19,"女"),
Student(6,"张鹏",17,"男")
)
// 使用`toDF`必须进行隐式转换
import sparkSession.implicits._
val df: DataFrame = list.toDF()
// 执行,类似于 select * from table;
df.show()
}API参考:https://blog.csdn.net/dabokele/article/details/52802150
show:展示数据
val df: DataFrame = list.toDF()
df.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
| 3|李四| 18| 男|
| 4|王五| 18| 男|
| 5|翠花| 19| 女|
| 6|张鹏| 17| 男|
+---+----+---+---+val df: DataFrame = list.toDF()
df.show(2)
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
+---+----+---+---+collect:获取所有数据到数组
不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。
val df: DataFrame = list.toDF()
val rows: Array[Row] = df.collect()
rows.foreach(println(_))[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]collectAsList:获取所有数据到List
功能和collect类似,只不过将返回结构变成了List对象,使用方法如下
val df: DataFrame = list.toDF()
val rows: util.List[Row] = df.collectAsList()
rows.forEach(println(_))[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]describe(cols: String*):获取指定字段的统计信息
这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。
val df: DataFrame = list.toDF()
val frame: DataFrame = df.describe("name", "age")
frame.show()+-------+----+------------------+
|summary|name| age|
+-------+----+------------------+
| count| 6| 6|
| mean|null|17.666666666666668|
| stddev|null|1.0327955589886446|
| min|张三| 16|
| max|翠花| 19|
+-------+----+------------------+first, head, take, takeAsList:获取若干行记录
这里列出的四个方法比较类似,其中
first获取第一行记录 val df: DataFrame = list.toDF()
val row: Row = df.first()
println(row)[1,张三,18,男]head获取第一行记录,head(n: Int)获取前n行记录 val df: DataFrame = list.toDF()
val row: Row = df.head()
println(row)[1,张三,18,男]取前n行记录
val df: DataFrame = list.toDF()
val rows: Array[Row] = df.head(3)
rows.foreach(println(_))[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]take(n: Int)获取前n行数据 val df: DataFrame = list.toDF()
val rows: Array[Row] = df.take(3)
rows.foreach(println(_))[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]takeAsList(n: Int)获取前n行数据,并以List的形式展现 val df: DataFrame = list.toDF()
val rows: util.List[Row] = df.takeAsList(3)
rows.forEach(println(_))[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
新增一些数据
val list=List(
Student(1,"张三",18,"男"),
Student(2,"绣花",16,"女"),
Student(3,"李四",18,"男"),
Student(4,"王五",18,"男"),
Student(5,"翠花",19,"女"),
Student(7,"张鹏",14,"男"),
Student(8,"刘秀",13,"男"),
Student(9,"王菲菲",20,"女"),
Student(10,"乐乐",21,"男"),
Student(11,"小惠",23,"女"),
Student(12,"梦雅",25,"女"),
)where(conditionExpr: String):SQL语言中where关键字后的条件
传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果,
查询性别为男的学生信息
import sparkSession.implicits._
val df: DataFrame = list.toDF()
df.where("sex='男'").show()+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 3|李四| 18| 男|
| 4|王五| 18| 男|
| 7|张鹏| 14| 男|
| 8|刘秀| 13| 男|
| 10|乐乐| 21| 男|
+---+----+---+---+and
查询年龄小于18岁,并且性别为女的学生信息
val df: DataFrame = list.toDF()
df.where("age<18 and sex='女'").show()+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 2|绣花| 16| 女|
+---+----+---+---+or
查询年龄>18 或者性别为女的学生信息
val df: DataFrame = list.toDF()
df.where("age>18 or sex='女'").show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+filter:根据字段进行筛选
传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同
查询性别不为男的学生信息
val df: DataFrame = list.toDF()
df.filter("sex!='男'").show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+filter中也可以使用or 和and。
val df: DataFrame = list.toDF()
df.filter("sex!='男' and age >20").show()+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 11|小惠| 23| 女|
| 12|梦雅| 25| 女|
+---+----+---+---+select:获取指定字段值
val df: DataFrame = list.toDF()
df.select("name","age").show()+------+---+
| name|age|
+------+---+
| 张三| 18|
| 绣花| 16|
| 李四| 18|
| 王五| 18|
| 翠花| 19|
| 张鹏| 14|
| 刘秀| 13|
|王菲菲| 20|
| 乐乐| 21|
| 小惠| 23|
| 梦雅| 25|
+------+---+还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数。可以实现select id, id+1 from test这种逻辑。
val df: DataFrame = list.toDF()
df.select(df("id"),df("id")+1).show()+---+--------+
| id|(id + 1)|
+---+--------+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 7| 8|
| 8| 9|
| 9| 10|
| 10| 11|
| 11| 12|
| 12| 13|
+---+--------+selectExpr:可以对指定字段进行特殊处理
可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。
获取年龄最大的学生信息
val df: DataFrame = list.toDF()
df.selectExpr(
"""
|max(age) max_age
|""".stripMargin).show()+-------+
|max_age|
+-------+
| 25|
+-------+col:获取指定字段
只能获取一个字段,返回对象为Column类型。
val df: DataFrame = list.toDF()
val ageColumn: Column = df.col("age")
val nameColumn: Column = df.col("name")
println(ageColumn)
println(nameColumn)age
nameapply:获取指定字段
只能获取一个字段,返回对象为Column类型
val df: DataFrame = list.toDF()
val ageColumn: Column = df.apply("age")
val nameColumn: Column = df.apply("name")
println(ageColumn)
println(nameColumn)age
namedrop:去除指定字段,保留其他字段
返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.drop("sex")
newDf.show()+---+------+---+
| id| name|age|
+---+------+---+
| 1| 张三| 18|
| 2| 绣花| 16|
| 3| 李四| 18|
| 4| 王五| 18|
| 5| 翠花| 19|
| 7| 张鹏| 14|
| 8| 刘秀| 13|
| 9|王菲菲| 20|
| 10| 乐乐| 21|
| 11| 小惠| 23|
| 12| 梦雅| 25|
+---+------+---+也可以去除多个字段
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.drop("sex","id")
newDf.show()+------+---+
| name|age|
+------+---+
| 张三| 18|
| 绣花| 16|
| 李四| 18|
| 王五| 18|
| 翠花| 19|
| 张鹏| 14|
| 刘秀| 13|
|王菲菲| 20|
| 乐乐| 21|
| 小惠| 23|
| 梦雅| 25|
+------+---+limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.limit(3)
newDf.show()+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
| 3|李四| 18| 男|
+---+----+---+---+orderBy和sort:按指定字段排序,默认为升序
orderBy按照年龄排序(asc)
val df: DataFrame = list.toDF()
df.orderBy("age").show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 8| 刘秀| 13| 男|
| 7| 张鹏| 14| 男|
| 2| 绣花| 16| 女|
| 4| 王五| 18| 男|
| 3| 李四| 18| 男|
| 1| 张三| 18| 男|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+orderBy按照年龄排序(desc)
val df: DataFrame = list.toDF()
df.orderBy(df("age").desc).show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 12| 梦雅| 25| 女|
| 11| 小惠| 23| 女|
| 10| 乐乐| 21| 男|
| 9|王菲菲| 20| 女|
| 5| 翠花| 19| 女|
| 4| 王五| 18| 男|
| 1| 张三| 18| 男|
| 3| 李四| 18| 男|
| 2| 绣花| 16| 女|
| 7| 张鹏| 14| 男|
| 8| 刘秀| 13| 男|
+---+------+---+---+sort按照年龄排序(asc)
val df: DataFrame = list.toDF()
df.sort(df("age").asc).show()sort按照年龄排序(desc)
val df: DataFrame = list.toDF()
df.sort(df("age").desc).show()sortWithinPartitions
和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。
sortWithinPartitions按照年龄排序(asc)
val df: DataFrame = list.toDF()
df.sortWithinPartitions(df("age").asc).show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 1| 张三| 18| 男|
| 3| 李四| 18| 男|
| 4| 王五| 18| 男|
| 5| 翠花| 19| 女|
------------------------------ 分为两个区,
| 8| 刘秀| 13| 男|
| 7| 张鹏| 14| 男|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+groupBy:根据字段进行group by操作
groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。
案例:按照性别分组,统计各个性别的总人数
cube和rollup:group by的扩展
功能类似于SQL中的group by cube/rollup。
GroupedData对象 该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段 val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").max("age")
newDF.show()+---+--------+
|sex|max(age)|
+---+--------+
| 男| 21|
| 女| 25|
+---+--------+min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段 val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").min("age")
newDF.show()+---+--------+
|sex|min(age)|
+---+--------+
| 男| 13|
| 女| 16|
+---+--------+mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段 val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").mean("age")
newDF.show()+---+--------+
|sex|avg(age)|
+---+--------+
| 男| 17.0|
| 女| 20.6|
+---+--------+sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段 val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").sum("age")
newDF.show()+---+--------+
|sex|sum(age)|
+---+--------+
| 男| 102|
| 女| 103|
+---+--------+count()方法,获取分组中的元素个数 val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").count()
newDF.show()+---+-----+
|sex|count|
+---+-----+
| 男| 6|
| 女| 5|
+---+-----+distinct:返回一个不包含重复记录的DataFrame
返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.distinct()
newDF.show()因为没有列是重复的数据所以就不展示了。
dropDuplicates:根据指定字段去重
根据指定字段去重。类似于select distinct a, b操作
按照年龄剔重
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.dropDuplicates("age")
newDF.show()+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 8| 刘秀| 13| 男|
| 2| 绣花| 16| 女|
| 9|王菲菲| 20| 女|
| 5| 翠花| 19| 女|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
| 10| 乐乐| 21| 男|
| 7| 张鹏| 14| 男|
| 1| 张三| 18| 男|
+---+------+---+---+聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.agg(
"age" -> "max",
"age" -> "avg",
"age" -> "min",
"age" -> "sum",
"age" -> "count"
)
newDF.show()+--------+------------------+--------+--------+----------+
|max(age)| avg(age)|min(age)|sum(age)|count(age)|
+--------+------------------+--------+--------+----------+
| 25|18.636363636363637| 13| 205| 11|
+--------+------------------+--------+--------+----------+重新整理一下数据
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)新增一个classid
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)示例
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.union(boysDF)
value.foreach(println(_))[2,绣花,16,女,1]
[5,翠花,19,女,2]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[8,刘秀,13,男,2]
[7,张鹏,14,男,1]
[10,乐乐,21,男,1]unionAll方法:对两个DataFrame进行组合
类似于SQL中的UNION ALL操作。
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.unionAll(boysDF)
value.foreach(println(_))[5,翠花,19,女,2]
[2,绣花,16,女,1]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[7,张鹏,14,男,1]
[8,刘秀,13,男,2]
[10,乐乐,21,男,1]重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。 接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。 笛卡尔积
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF)
value.foreach(println(_))[2,绣花,16,女,1,3,李四,18,男,2]
[2,绣花,16,女,1,7,张鹏,14,男,1]
[2,绣花,16,女,1,8,刘秀,13,男,2]
[2,绣花,16,女,1,1,张三,18,男,3]
[5,翠花,19,女,2,3,李四,18,男,2]
[5,翠花,19,女,2,7,张鹏,14,男,1]
[5,翠花,19,女,2,8,刘秀,13,男,2]
[5,翠花,19,女,2,1,张三,18,男,3]
[9,王菲菲,20,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,7,张鹏,14,男,1]
[9,王菲菲,20,女,1,8,刘秀,13,男,2]
[11,小惠,23,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,1,张三,18,男,3]
[11,小惠,23,女,1,7,张鹏,14,男,1]
[11,小惠,23,女,1,8,刘秀,13,男,2]
[12,梦雅,25,女,3,3,李四,18,男,2]
[12,梦雅,25,女,3,7,张鹏,14,男,1]
[11,小惠,23,女,1,1,张三,18,男,3]
[12,梦雅,25,女,3,8,刘秀,13,男,2]
[2,绣花,16,女,1,4,王五,18,男,2]
[12,梦雅,25,女,3,1,张三,18,男,3]
[2,绣花,16,女,1,10,乐乐,21,男,1]
[5,翠花,19,女,2,4,王五,18,男,2]
[5,翠花,19,女,2,10,乐乐,21,男,1]
[9,王菲菲,20,女,1,4,王五,18,男,2]
[9,王菲菲,20,女,1,10,乐乐,21,男,1]
[11,小惠,23,女,1,4,王五,18,男,2]
[11,小惠,23,女,1,10,乐乐,21,男,1]
[12,梦雅,25,女,3,4,王五,18,男,2]
[12,梦雅,25,女,3,10,乐乐,21,男,1]using一个字段形式
下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,
import sparkSession.implicits._
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,"classId")
value.foreach(println(_))[2,5,翠花,19,女,8,刘秀,13,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]using多个字段形式
除了上面这种using一个字段的情况外,还可以using多个字段,如下
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId","id"))
value.foreach(println(_))指定join类型
两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示
left_outer
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"left_outer")
value.foreach(println(_))[1,11,小惠,23,女,10,乐乐,21,男]
[2,5,翠花,19,女,8,刘秀,13,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[3,12,梦雅,25,女,1,张三,18,男]right_outer
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"right_outer")
value.foreach(println(_))[2,5,翠花,19,女,8,刘秀,13,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[1,2,绣花,16,女,7,张鹏,14,男]其他的就演示了 以上案例整理参考:https://blog.csdn.net/dabokele/article/details/52802150 更多API请参考Spark官网
上面使用的是样例类,会自动将字段名称及字段类型与表中的字段进行对应
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)@Test
def demo02: Unit ={
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
//val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
boysDF.show()
}+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+-------+使用printSchema 查看字段类型
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- sex: string (nullable = true)
|-- classId: integer (nullable = false)处理使用样例类,也可以使用元组的形式
@Test
def demo03(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
val boysDF: DataFrame = list.toDF()
boysDF.show()
}表字段将使用元组索引命名
+---+----+---+---+---+
| _1| _2| _3| _4| _5|
+---+----+---+---+---+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+---+使用printSchema 查看字段类型
root
|-- _1: integer (nullable = false)
|-- _2: string (nullable = true)
|-- _3: integer (nullable = false)
|-- _4: string (nullable = true)
|-- _5: integer (nullable = false)toDF(colNames: String*)
重新队列进行命名
字段名为_N的形式,不是很友好,可以自行指定
@Test
def demo03(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
val boysDF: DataFrame = list.toDF("id","name","age","sex","classId")
boysDF.show()
}+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+-------+总结一下:
索引名 (1,"张三",18,"男",3),
("3","李四",18,"男",2), (1,"张三",18,"男",3),
(3,"李四",18),toDF(colNames: String*)重载方法,设置命名,必须元参数个数保持一致。除了使用集合.toDF,也可以使用rdd.toDF 将 RDD转为DataFrame
@Test
def demo04(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
// 获取 SparkContext
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
// 使用 toDF 必须定义隐式转换
import sparkSession.implicits._
// RDD 转换成 DataFrame
val df: DataFrame = rdd.toDF
df.show()
}使用toDF必须定义隐式转换
上面的所有案例都是采用 toDF 的方式创建,关于DataFrame的创建方式一共有四种创建方式。
import sparkSession.implicits._createDataFrame[A <: Product : TypeTag](rdd: RDD[A])
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
val df: DataFrame = sparkSession.createDataFrame(rdd)
df.show()+---+----+---+---+---+
| _1| _2| _3| _4| _5|
+---+----+---+---+---+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+---+createDataFrame(rowRDD: RDD[Row], schema: StructType)
@Test
def demo06(): Unit ={
val list=List(
Row(1,"张三",18,"男",3),
Row(3,"李四",18,"男",2),
Row(4,"王五",18,"男",2),
Row(7,"张鹏",14,"男",1),
Row(8,"刘秀",13,"男",2),
Row(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[Row] = sc.parallelize(list)
// 指定StructType
val fields=Array(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType),
StructField("sex",StringType),
StructField("classId",IntegerType)
)
val schema =StructType(fields)
val df = sparkSession.createDataFrame(rdd, schema)
df.show()
}相关依赖
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row}剩下的暂时用不到...
createDataFrame[A <: Product : TypeTag](data: Seq[A])
createDataFrame(rowRDD: JavaRDD[Row], schema: StructType)
createDataFrame(rows: java.util.List[Row], schema: StructType)
createDataFrame(rdd: RDD[_], beanClass: Class[_])
createDataFrame(rdd: JavaRDD[_], beanClass: Class[_])
createDataFrame(data: java.util.List[_], beanClass: Class[_])
示例一:
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
val df: DataFrame = sparkSession.createDataFrame(rdd)
df.show()DataFrame返回一个新的DataFrame @Test
def demo08(): Unit ={
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
// 导入隐式转换
import sparkSession.implicits._
val femaleDf: DataFrame = female.toDF()
val boysDf: DataFrame = boys.toDF()
val unionAllDf: DataFrame = femaleDf.unionAll(boysDf)
val group: RelationalGroupedDataset = unionAllDf.groupBy("sex")
val resultDf: DataFrame = group.max("age")
resultDf.show()
}+---+--------+
|sex|max(age)|
+---+--------+
| 男| 21|
| 女| 25|
+---+--------+本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。