在 Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:
DataFrame 中的数据结构信息,即为 Scheme
(使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。在 Scala 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。
其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._
SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。
package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofReflection {
def main(args: Array[String]): Unit = {
}
def method1():Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// 引入 sqlContext.implicits._
import sqlContext.implicits._
// 将 RDD 转成 DataFrame
/*val people = sc.textFile("people.txt").toDF()*/
val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
people.show()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
// DataFrame 转成 RDD 进行操作:根据索引号取值
teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)
// DataFrame 转成 RDD 进行操作:根据字段名称取值
teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)
// DataFrame 转成 RDD 进行操作:一次返回多列的值
teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
sc.stop()
}
/**
* 定义 Person 类
* @param name 姓名
* @param age 年龄
*/
case class Person(name:String,age:Int)
}
通过 Spark SQL 的接口创建 RDD 的 Schema,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:
package sparksql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrametoRDDofInterface {
def main(args: Array[String]): Unit = {
method2()
}
def method2(): Unit = {
val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val people = sc.textFile("people.txt")
// 以字符串的方式定义 DataFrame 的 Schema 信息
val schemaString = "name age"
// 导入所需要的类
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
// 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema
val schema = StructType(
schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
// 将 RDD 转换成 Row
val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
// 将 Schema 作用到 RDD 上
val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
// 将 DataFrame 注册成临时表
peopleDataFrame.registerTempTable("people")
// 获取 name 字段的值
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name" + t(0)).collect().foreach(println)
sc.stop()
}
}