前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Spark(RDD,CSV)创建DataFrame方式

Spark(RDD,CSV)创建DataFrame方式

作者头像
Tim在路上
发布于 2020-08-04 13:46:04
发布于 2020-08-04 13:46:04
1.5K00
代码可运行
举报
运行总次数:0
代码可运行

spark将RDD转换为DataFrame

  1. 方法一(不推荐)

spark将csv转换为DataFrame,可以先文件读取为RDD,然后再进行map操作,对每一行进行分割。 再将schema和rdd分割后的Rows回填,sparkSession创建的dataFrame

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val spark = SparkSession
      .builder()
      .appName("sparkdf")
      .master("local[1]")
      .getOrCreate()

       //设置spark的上下文sparkContext
      val sc = spark.sparkContext
      val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
      //val rdd = fileRDD.filter(line => line.split("\t").length != 30)
      val df = spark.createDataFrame(fileRDD.map(line=>HttpSchema.parseLog(line)),HttpSchema.struct)
      df.show(3)

这里的RDD是通过读取文件创建的所以也可以看做是将RDD转换为DataFrame

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
object HttpSchema {

  def parseLog(x:String): Row = {

    var fields = x.split("\t")

    val _id = fields(0)
    val srcIp = fields(1)
    val srcPort = fields(2)
    //这种方法比较麻烦的地方是row里面的字段名要和struct中的字段对应上
    RowFactory.create(_id,srcIp,srcPort)
  }

  //设置schema描述
  val struct = StructType(
    Array( StructField("_id",StringType),
      StructField("srcIp",StringType),
      StructField("srcPort",StringType),
    )
  )
}

这也是这种方法不推荐使用的地方,因为返回的Row中的字段名要与schema中的字段名要一致,当字段多于22个这个需要集成一个

2.方法二 //使用隐式转换的方式来进行转换

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val spark = SparkSession
      .builder()
      .appName("sparkdf")
      .master("local[1]")
      .getOrCreate()
      
      //使用隐式转换必须导入这个才可以使用只有import spark.implicits._之后,RDD才有toDF、toDS功能


      import spark.implicits._
      
       //设置spark的上下文sparkContext
      val sc = spark.sparkContext

      val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")

      case class HttpClass(id:String,srcIp:String,srcPort:Int)

      val df = fileRDD.map(_.split("\t")).map(line=>HttpClass(line(0),line(1),line(2).toInt)).toDF()

当然也可以不创建类对象

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rdd.map{x=>val par=x.split(",");(par(0),par(1).toInt)}.toDF("name","age")

dataFrame转换为RDD只需要将collect就好,df.collect RDD[row]类型,就可以按row取出

spark读取csv转化为DataFrame

  1. 方法一
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val conf = new SparkConf().setAppName("word count").setMaster("local[1]")

    val sc = new SparkContext(conf)
    println("spark version: " + sc.version)

    val spark = new SQLContext(sc)

    import spark.implicits._

    val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "false")
      .option("inferSchema", "false") //是否自动推到内容的类型
      .option("delimiter",",")  //分隔符,默认为 ,
      .load("/home/hadoop/Downloads/Salary_Data.csv")
    df.show()
    
    
    //进行写数据
    data.repartition(1).write.format("com.databricks.spark.csv")
      .option("header", "false")//在csv第一行有属性"true",没有就是"false"
      .option("delimiter",",")//默认以","分割
      .save(outpath)
     sparkContext.stop()

sparkContext.sql()操作完成后直接返回的是DataFrame 当然可以间接采用将csv直接转换为RDD然后再将RDD转换为DataFrame

2.方法二

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 读取数据并分割每个样本点的属性值 形成一个Array[String]类型的RDD 
val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\\s+")) 
// 将rdd转换成LabeledPoint类型的RDD 
val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble))))
// 转成DataFrame并只取"features"列 
val data = spark.createDataFrame(LabeledPointRdd).select("features")
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
适合小白入门的IDEA开发SparkSQL详细教程
之前博主利用业余时间,梳理了一份《SparkSQL编程系列》,奈何当时考虑不周,写的不是很详细。于是在正式开始学习了之后,决定整理一篇适合像我一样的小白级别都能看得懂的IDEA操作SparkSQL教程,于是就有了下文…
大数据梦想家
2021/01/27
2K0
适合小白入门的IDEA开发SparkSQL详细教程
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
Spark 2.0开始,SparkSQL应用程序入口为SparkSession,加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。
Lansonli
2021/10/09
1.4K0
RDD和DataFrame转换
在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame。
羊羽shine
2019/08/23
1.3K0
SparkSQL快速入门系列(6)
上一篇《SparkCore快速入门系列(5)》,下面给大家更新一篇SparkSQL入门级的讲解。
刘浩的BigDataPath
2021/04/13
2.4K0
SparkSQL快速入门系列(6)
数据分析EPHS(2)-SparkSQL中的DataFrame创建
本文的开头,咱们正式给该系列取个名字了,就叫数据分析EPHS系列,EPHS分别是Excel、Python、Hive和SparkSQL的简称。本篇是该系列的第二篇,我们来讲一讲SparkSQL中DataFrame创建的相关知识。
石晓文
2019/07/09
1.6K0
数据分析EPHS(2)-SparkSQL中的DataFrame创建
Spark之【SparkSQL编程】系列(No1)——《SparkSession与DataFrame》
上一篇博客已经为大家介绍完了SparkSQL的基本概念以及其提供的两个编程抽象:DataFrame和DataSet,本篇博客,博主要为大家介绍的是关于SparkSQL编程的内容。考虑到内容比较繁琐,故分成了一个系列博客。本篇作为该系列的第一篇博客,为大家介绍的是SparkSession与DataFrame。
大数据梦想家
2021/01/27
1.6K0
Spark之【SparkSQL编程】系列(No1)——《SparkSession与DataFrame》
Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍
  在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
不温卜火
2020/10/28
2.3K0
Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍
大数据随记 —— DataFrame 与 RDD 之间的相互转换
在 Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:
繁依Fanyi
2023/05/07
1.2K0
大数据随记 —— DataFrame 与 RDD 之间的相互转换
DataFrame与RDD的互操作
DataFrame Interoperating with RDDs 参考官网 http://spark.apache.org/docs/2.2.0/sql-programming-guide.html#interoperating-with-rdds DataFrame和RDD互操作的两种方式比较: 1)反射推导式:case class 前提:事先需要知道字段、字段类型 2)编程式:Row 如果第一种情况不能满足要求(事先不知道列等schema信息) 选型:优先考虑第一种,使用
sparkle123
2018/04/26
8900
DataFrame与RDD的互操作
【Spark篇】---SparkSQL初始和创建DataFrame的几种方式
          Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
LhWorld哥陪你聊算法
2018/09/13
2.7K0
【Spark篇】---SparkSQL初始和创建DataFrame的几种方式
SparkSQL如何实现多数据源交互?这篇博客或许能告诉你答案!
学了一段时间的SparkSQL,相信大家都已经知道了SparkSQL是一个相当强大的存在,它在一个项目的架构中扮演着离线数据处理的"角色",相较于前面学过的HQL,SparkSQL能明显提高数据的处理效率。正因为如此,SparkSQL就会涉及到与多种的数据源进行一个交互的过程。那到底是如何交互的呢,下文或许能给你带来答案…
大数据梦想家
2021/01/27
7820
SparkSQL如何实现多数据源交互?这篇博客或许能告诉你答案!
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。 DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
Maynor
2022/05/08
2.6K0
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率
Maynor
2021/12/07
2.3K0
Note_Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark SQL DataFrame与RDD交互
Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。使用反射获取的 BeanInfo 定义了表的 schema。目前为止,Spark SQL 还不支持包含 Map 字段的 JavaBean。但是支持嵌套的 JavaBeans,List 以及 Array 字段。你可以通过创建一个实现 Serializable 的类并为其所有字段设置 getter 和 setter 方法来创建一个 JavaBean。
smartsi
2019/08/07
1.8K0
第三天:SparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
sowhat1412
2020/11/05
13.3K0
第三天:SparkSQL
Spark SQL 快速入门系列(5) | 一文教你如何使用 IDEA 创建 SparkSQL 程序(小白入门!)
一. 添加 SparkSQL 依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> 二. 具体代码 2.1 DataFrameDemo 1. 源码 package com.buwenbuhuo.spark.sql.day01 import org.apache.spark.
不温卜火
2020/10/28
1.2K0
Spark SQL 快速入门系列(5) | 一文教你如何使用 IDEA 创建 SparkSQL 程序(小白入门!)
RDD转换为DataFrame
为什么要将RDD转换为DataFrame?因为这样的话,我们就可以直接针对HDFS等任何可以构建为RDD的数据,使用Spark SQL进行SQL查询了。这个功能是无比强大的。想象一下,针对HDFS中的数据,直接就可以使用SQL进行查询。
编程那点事
2023/02/25
8000
spark2 sql编程样例:sql操作
问题导读 1.DataFrame中本文使用了row哪些方法? 2.操作DataFrame row需要导入什么包? 3.teenagersDF.map(teenager => "Name: " + te
用户1410343
2018/03/26
3.4K0
spark2 sql编程样例:sql操作
2021年大数据Spark(三十二):SparkSQL的External DataSource
在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:
Lansonli
2021/10/09
2.4K0
Spark入门指南:从基础概念到实践应用全解析
在这个数据驱动的时代,信息的处理和分析变得越来越重要。而在众多的大数据处理框架中,「Apache Spark」以其独特的优势脱颖而出。
BookSea
2023/10/20
7390
Spark入门指南:从基础概念到实践应用全解析
推荐阅读
相关推荐
适合小白入门的IDEA开发SparkSQL详细教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验