Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >逐行阅读Alternative Spark

逐行阅读Alternative Spark
EN

Stack Overflow用户
提问于 2021-05-05 20:37:55
回答 3查看 67关注 0票数 0

我想要逐行读取一个用例,我需要从header读取数据,并将数据发送到Header和Trailer之间的所有记录Spark不允许逐行读取-我们如何实现这样的用例。

代码语言:javascript
运行
AI代码解释
复制
*H,TextStart,1244
I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1235
I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1244
I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1236
I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd

预期的输出是从头部获取数据,并附加到头部和尾部记录。我不知道该怎么做

代码语言:javascript
运行
AI代码解释
复制
1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU ,    
1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU ,    
1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU, 

我使用了zip索引并填充了行号。

代码语言:javascript
运行
AI代码解释
复制
df = spark.read.text('/hdfsData/file.csv')
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_11 = spark.sql("select value.value ,index from linenumber where value.value like '*H,%' or value.value like '*T,%'")

现在,我计划连接df_11和df_1,并执行一些范围连接逻辑来选择值。但是,有没有其他有效的方法来达到同样的效果呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-05-06 20:07:38

我已经使用zip索引和窗口滞后函数实现了同样的功能。然而,这将在单个分区中完成,因为数据的性质要求它。提供适当的执行器内存。

代码语言:javascript
运行
AI代码解释
复制
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import *
df = spark.read.text('MQ_out.csv')
#Adding index column each row get its row numbers 
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_1.createOrReplaceTempView("linenumber")
#zipindex creates array making back to string
df_2 = spark.sql("select value.value as value , index  from linenumber")
df_2.createOrReplaceTempView("linenumber1")
df_3 = spark.sql("select * from linenumber1 where value is not null or trim(value)!=''")
df_3.createOrReplaceTempView("linenumber2")
#Splitting and extracting the location value from header and assigning null
df_new = spark.sql("select value,case when value like '*H,%' then split(value,',')[2] else null end as location,index from linenumber2")
#Assign previous row value to next row if the current row is null
df_new=df_new.withColumn('newlocation',F.when(df_new.location>0,df_new.location).otherwise(F.last(df_new.location,ignorenulls=True).over(Window.orderBy("index"))))
#counting the number of , in data
df_new.withColumn('Comma_Count', size(split(col("value"), r",")) - 1)
df_new.createOrReplaceTempView("FinalData")
#remove Header and tralierrows which doesnt have needed number of commans
df_final=spark.sql("select value,newlocation as location from FinalData where Comma_Count = 11")
df_final.createOrReplaceTempView("FinalData_1")
#concat to bring the required data
df_write = spark.sql("select concat(value,newlocation) from FinalData_1")
df_write.text('Filename')
票数 0
EN

Stack Overflow用户

发布于 2021-05-13 21:14:28

我通过使用两个lag函数成功地做到了这一点

代码语言:javascript
运行
AI代码解释
复制
(df
    .withColumn('id', F.monotonically_increasing_id())            # make fake ID for window functions below
    .withColumn('lag_1', F.lag('value', 1).over(W.orderBy('id'))) # grab the previous row (offset 1)
    .withColumn('lag_2', F.lag('value', 2).over(W.orderBy('id'))) # grab the previous row (offset 2)
    .withColumn('heading', F                                      # make heading column based on availability of both lag_1 and lag_2
        .when(F.col('lag_1').startswith('*H,TextStart'), F.col('lag_1'))
        .when(F.col('lag_2').startswith('*H,TextStart'), F.col('lag_2'))
    )
    .where(F.col('heading').isNotNull())                          # remove header and trailer rows, which does not have proper heading
    .withColumn('value', F.concat(F.split(F.col('heading'), ',')[2], F.lit(','), F.col('value')))
                                                                  # concatenate heading number with current value
    .drop('id', 'lag_1', 'lag_2', 'heading')                      # remove temporary columns
    .show(20, False)
)

# +-------------------------------------------------------------------------------------------------------+
# |value                                                                                                  |
# +-------------------------------------------------------------------------------------------------------+
# |1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       , |
# |1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       , |
# |1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# +-------------------------------------------------------------------------------------------------------+
票数 1
EN

Stack Overflow用户

发布于 2021-05-06 07:58:03

将文件读入RDD

代码语言:javascript
运行
AI代码解释
复制
val rdd = sc.textFile("/FileStore/tables/sf0.txt")

然后用逗号分隔文件的每一行,并过滤掉头部和尾部。

代码语言:javascript
运行
AI代码解释
复制
val rdd2 = rdd.map(x => x.split(",")).filter(x => x(0) != "*H" && x(0) != "*T")

接下来,将数组转换为可转换为dataframe的元组。如果我们跳过这一步,dataframe将只有一列,其中包含数组中的所有内容。

代码语言:javascript
运行
AI代码解释
复制
val finalRDD = rdd2.map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10)))

将RDD转换为dataframe

代码语言:javascript
运行
AI代码解释
复制
val myDF = finalRDD.toDF()

检查数据帧内容:

代码语言:javascript
运行
AI代码解释
复制
myDF.show()

+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
| _1|       _2|  _3|        _4|  _5|        _6|  _7|   _8|        _9|                 _10|       _11|
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
|  I|000000001|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000002|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000004|GOOD|-000000001| CRT|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004| DPT|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000005|GOOD|-000000001|ABCD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|EFGF|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67408448

复制
相关文章
逐行阅读Spring5.X源码(一) BeanDefinition,起点
本篇博客你讲学到: 1. 如何理解BeanDefinition 2. 准备环境 3. BeanDefinition接口讲解 4. BeanDefinition的类继承关系 5. IOC的引出 6. BeanFactory工厂的引出 7. 后置处理器的引出 8. spring扩展性初探
源码之路
2020/09/03
1.4K0
Spark RDD类源码阅读
每天进步一点点~开搞~ abstract class RDD[T: ClassTag]( //@transient 注解表示将字段标记为瞬态的 @transient private var _sc: SparkContext, // Seq是序列,元素有插入的先后顺序,可以有重复的元素。 @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { if (c
用户3003813
2018/09/06
6120
spark源码阅读基本思路
a.解决企业中bug。比如flink早期bug,就很多,如json序列化工具,在开启flink仅一次处理,json格式不符合要求,就会抛异常而挂掉,然后重试,挂掉。这明显不科学,要解决这个bug就要会读源码,改源码。
Spark学习技巧
2021/03/05
1.4K0
spark源码阅读基本思路
Spark常用函数(源码阅读六)
  源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。
用户3003813
2018/09/06
8370
Spark常用函数(源码阅读六)
逐行阅读Spring5.X源码(十一)AOP概念、应用、原理
与OOP对比,面向切面,传统的OOP开发中的代码逻辑是自上而下的,而这些过程会产生一些横切性问题,这些横切性的问题和我们的主业务逻辑关系不大,这些横切性问题不会影响到主逻辑实现的,但是会散落到代码的各个部分,难以维护。AOP是处理一些横切性问题,AOP的编程思想就是把这些问题和主业务逻辑分开,达到与主业务逻辑解耦的目的。使代码的重用性和开发效率更高。
源码之路
2020/09/04
8910
逐行阅读Spring5.X源码(十一)AOP概念、应用、原理
逐行阅读Spring5.X源码(六) ClassPathBeanDefinitionScanner扫描器
spring包扫描是通过ClassPathBeanDefinitionScanner类来完成的,它主要工作有两个:
源码之路
2020/09/04
6720
逐行阅读Spring5.X源码(六) ClassPathBeanDefinitionScanner扫描器
逐行阅读Spring5.X源码(十三)spring事务源码分析
spring并不直接管理事务,而是提供了多种事务管理器,他们将事务的管理职责委托给Hibernate或JTA等持久化机制所提供的相关平台框架的事务来实现。通过这个接口,spring为各个平台如JDBC、Hibernate等提供了对应的事务管理器,但是事务的实现就是各平台自己的事情了。
源码之路
2020/09/04
7290
逐行阅读Spring5.X源码(十三)spring事务源码分析
Alternative Process Injection
2 .WriteProcessMemory -> 将解密/解码的shellcode写入内存空间
黑白天安全
2021/12/29
9820
Alternative Process Injection
逐行阅读Spring5.X源码(番外篇)AnnotatedBeanDefinitionReader的作用
在上一篇文章里面我们讨论了一个beanDefintion对象的重要性,为了讨论spring当中的beanDefinition对象我们不得不牵扯出spring当中的bean工厂后置处理器也就是BeanFactoryPostProcessor这个类;继而讨论了BeanFactoryPostProcessor的大概执行时机(BeanFactoryPostProcessor的执行时机很重要而且spring内部做的稍微有点复杂,本文重点来讨论spring内部怎么保证这些执行时机得以严禁的执行,还有如何来扩展spring的bean工厂后置处理器);首先通过一张图简单的理解一下spring容器启动的时候执行调用BeanFactoryPostProcessor后置处理器的大概的方法执行顺序:
源码之路
2020/09/04
7950
逐行阅读Spring5.X源码(番外篇)AnnotatedBeanDefinitionReader的作用
UCF约束介绍_flcl alternative
(1)时序约束:主要用于规范设计的时序行为,表达设计者期望满足的时序条件,知道综合和布局布线阶段的优化算法等。
全栈程序员站长
2022/09/20
1.1K0
逐行阅读Spring5.X源码(四) BeanFactory——核心容器bean工厂
5.DefaultSingletonBeanRegistry。SingletionBean注册器的默认实现,同时继承SimpleAliasRegistry。因此这个类可以有别名注册的功能和单例bean注册的功能,并且他还支持注册DisposableBean实例;它依赖ObjectFactory接口和DisposableBean接口(关闭注册表时调用到了destroy方法)。侧重于Bean的注册,销毁,以及依赖关系(关联关系)的注册和销毁。
源码之路
2020/09/04
1.4K0
逐行阅读Spring5.X源码(四) BeanFactory——核心容器bean工厂
Spark之SQL解析(源码阅读十)
  如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么。之前总结的已经写了传统数据库与Spark的sql解析之间的差别。那么我们下来直切主题~   如今的Spark已
用户3003813
2018/09/06
8720
Spark之SQL解析(源码阅读十)
【IDEA】阅读Spark源码,配置Scala SDK
很久没有看 Spark 的源码了,打开 IDEA,一堆报错,看了一下主要是 Scala 的问题,所以先删掉了 IDEA 里下载的 Scala SDK,然后重新安装,重新配置。
runzhliu
2020/08/02
5740
Spark Netty与Jetty (源码阅读十一)
  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。
用户3003813
2018/09/06
1.1K0
Spark Netty与Jetty (源码阅读十一)
The Polar Code, an Alternative Perspective
最近在学习Polar code,是华为主推的被钦定为5G控制信道的一种信道编码方式。大概也是唯一没有经历过充分商用验证就被3GPP认可的吧。据说领导打电话给国内各个厂家,提升到民族大义层面,要求必须支持。领导都支持,我们更要支持了,还要好好学习贯彻! Polar code的构造很有意思,把信道复制N份后组合到一起,经过了规律的线性变换,最后产生了分裂split,出现了极化效果polarization:这复制的N份信道一部分容量趋于1,另一部分趋于0,所以叫做Polar码。简直是马太效应啊,穷人越来越穷,富人
用户1594945
2018/07/20
5550
Spark 源码(9)- Worker 启动 Executor 源码阅读
上一次阅读到 Master 调用 schedule() 方法,遍历 waitingApps,为每个程序决定启动多少 Executor,为每个 Executor 分配多少资源,有了这些信息之后,给 Worker 发送了一个 LaunchExecutor 消息,Worker 开始处理。
kk大数据
2021/11/16
5690
Spark 源码(9)- Worker 启动 Executor 源码阅读
阅读源码|Spark 与 Flink 的 RPC 实现
近日常有同学来问我如何阅读代码,关于这个问题的一般性答案我特别提了一个问题并自问自答。出于提供一个实际的例子的考量,正好此前综合地阅读 Spark 的 RPC 实现、Flink 基于 Akka 的 RPC 实现和 Actor Model 的通信模型,写成本文分享我阅读分布式计算系统 Spark 和 Flink 中的 RPC 实现的过程和思考。
王知无-import_bigdata
2020/04/15
1.3K0
逐行阅读Spring5.X源码(三) BeanDefinition的实现类详解,拔高
上回咱们讲了BeanDefinition的父接口及父接口实现类。本篇博文咱么继续讲BeanDefinition的实现类。包括AbstractBeanDefinition、RootBeanDefinition、ChildBeanDefinition、GenericBeanDefinition、AnnotateGenericBeanDefinition、ScannerGenericBeanDefinition。
源码之路
2020/09/04
1.7K0
逐行阅读Spring5.X源码(三) BeanDefinition的实现类详解,拔高
spark源码单步跟踪阅读-从毛片说起
想当年读大学时,那时毛片还叫毛片,现在有文明的叫法了,叫小电影或者爱情动作片。那时宿舍有位大神,喜欢看各种毛片,当我们纠结于毛片上的马赛克时,大神大手一挥说道:这算啥,阅尽天下毛片,心中自然无码!突然想到我们在学习spark时,也可以有这种精神,当我们能读懂spark源码时,spark的技术世界也就真正为我们敞开了大门。中国台湾C++大师侯捷说过:源码面前,了无秘密!那我们就从如何单步调试spark源码开始讲起吧。 首先开发工具推荐大家选择IntelliJ,Intellij在和scala语言的结合上,
Spark学习技巧
2018/06/22
1.5K0
Python逐行写入
# -*-coding:utf-8-*-import os# 写之前,先检验文件是否存在,存在就删掉if os.path.exists("dest.txt"): os.remove("dest.txt")# 以写的方式打开文件,如果文件不存在,就会自动创建file_write_obj = open("dest.txt", 'w')var = "adfss"file_write_obj.writelines(var)file_write_obj.write('\n')file_write_obj.clo
狼啸风云
2020/07/26
2.4K0

相似问题

逐行阅读

45

逐行阅读文件

40

逐行阅读字母

317

逐行阅读streams

21

golang逐行阅读

10
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档