摘 要
在自定义的程序中编写Spark SQL查询程序
package com.itunic.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itunic.com on 2017/1/2.
* Spark SQL
* 通过反射推断Schema
* by me:
* 我本沉默是关注互联网以及分享IT相关工作经验的博客,
* 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
* 博客宗旨:把最实用的经验,分享给最需要的你,
* 希望每一位来访的朋友都能有所收获!
*
*/
object InferringSchema {
def main(args: Array[String]): Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("InferringSchema").setMaster("local")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
val fields = f.split("\t")
//将RDD和case class关联
Person(fields(0).toLong, fields(1), fields(2).toInt)
})
//导入隐式转换,如果不导入无法将RDD转换成DataFrame
//将RDD转换成DataFrame
import sqlContext.implicits._
val personDF = lineRdd.toDF
//注册表
personDF.registerTempTable("t_person")
//传入SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 2")
//显示
df.show()
//以json方式写入hdfs
//df.write.json("hdfs://ns1:9000/wc")
sc.stop()
}
}
//定义样例类
case class Person(id: Long, userName: String, age: Int)
package com.itunic.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
/**
* Created by itunic.com on 2017/1/2.
* Spark SQL
* 通过StructType直接指定Schema
* by me:
* 我本沉默是关注互联网以及分享IT相关工作经验的博客,
* 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
* 博客宗旨:把最实用的经验,分享给最需要的你,
* 希望每一位来访的朋友都能有所收获!
*
*/
object SpecifyingSchema {
def main(args: Array[String]): Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SpecifyingSchema").setMaster("local")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField("id", LongType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
val fields = f.split("\t")
Row(fields(0).toLong, fields(1), fields(2).toInt)
})
//将schema信息应用到lineRdd上
val personDF = sqlContext.createDataFrame(lineRdd, schema)
personDF.registerTempTable("t_person")
//传入SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 2")
//显示
df.show()
//以json方式写入hdfs
//df.write.json("hdfs://ns1:9000/wc")
sc.stop()
}
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有