首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >逐行阅读Alternative Spark

逐行阅读Alternative Spark
EN

Stack Overflow用户
提问于 2021-05-05 20:37:55
回答 3查看 67关注 0票数 0

我想要逐行读取一个用例,我需要从header读取数据,并将数据发送到Header和Trailer之间的所有记录Spark不允许逐行读取-我们如何实现这样的用例。

代码语言:javascript
运行
AI代码解释
复制
*H,TextStart,1244
I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1235
I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1244
I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1236
I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd

预期的输出是从头部获取数据,并附加到头部和尾部记录。我不知道该怎么做

代码语言:javascript
运行
AI代码解释
复制
1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU ,    
1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU ,    
1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU, 

我使用了zip索引并填充了行号。

代码语言:javascript
运行
AI代码解释
复制
df = spark.read.text('/hdfsData/file.csv')
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_11 = spark.sql("select value.value ,index from linenumber where value.value like '*H,%' or value.value like '*T,%'")

现在,我计划连接df_11和df_1,并执行一些范围连接逻辑来选择值。但是,有没有其他有效的方法来达到同样的效果呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-05-06 20:07:38

我已经使用zip索引和窗口滞后函数实现了同样的功能。然而,这将在单个分区中完成,因为数据的性质要求它。提供适当的执行器内存。

代码语言:javascript
运行
AI代码解释
复制
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import *
df = spark.read.text('MQ_out.csv')
#Adding index column each row get its row numbers 
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_1.createOrReplaceTempView("linenumber")
#zipindex creates array making back to string
df_2 = spark.sql("select value.value as value , index  from linenumber")
df_2.createOrReplaceTempView("linenumber1")
df_3 = spark.sql("select * from linenumber1 where value is not null or trim(value)!=''")
df_3.createOrReplaceTempView("linenumber2")
#Splitting and extracting the location value from header and assigning null
df_new = spark.sql("select value,case when value like '*H,%' then split(value,',')[2] else null end as location,index from linenumber2")
#Assign previous row value to next row if the current row is null
df_new=df_new.withColumn('newlocation',F.when(df_new.location>0,df_new.location).otherwise(F.last(df_new.location,ignorenulls=True).over(Window.orderBy("index"))))
#counting the number of , in data
df_new.withColumn('Comma_Count', size(split(col("value"), r",")) - 1)
df_new.createOrReplaceTempView("FinalData")
#remove Header and tralierrows which doesnt have needed number of commans
df_final=spark.sql("select value,newlocation as location from FinalData where Comma_Count = 11")
df_final.createOrReplaceTempView("FinalData_1")
#concat to bring the required data
df_write = spark.sql("select concat(value,newlocation) from FinalData_1")
df_write.text('Filename')
票数 0
EN

Stack Overflow用户

发布于 2021-05-13 21:14:28

我通过使用两个lag函数成功地做到了这一点

代码语言:javascript
运行
AI代码解释
复制
(df
    .withColumn('id', F.monotonically_increasing_id())            # make fake ID for window functions below
    .withColumn('lag_1', F.lag('value', 1).over(W.orderBy('id'))) # grab the previous row (offset 1)
    .withColumn('lag_2', F.lag('value', 2).over(W.orderBy('id'))) # grab the previous row (offset 2)
    .withColumn('heading', F                                      # make heading column based on availability of both lag_1 and lag_2
        .when(F.col('lag_1').startswith('*H,TextStart'), F.col('lag_1'))
        .when(F.col('lag_2').startswith('*H,TextStart'), F.col('lag_2'))
    )
    .where(F.col('heading').isNotNull())                          # remove header and trailer rows, which does not have proper heading
    .withColumn('value', F.concat(F.split(F.col('heading'), ',')[2], F.lit(','), F.col('value')))
                                                                  # concatenate heading number with current value
    .drop('id', 'lag_1', 'lag_2', 'heading')                      # remove temporary columns
    .show(20, False)
)

# +-------------------------------------------------------------------------------------------------------+
# |value                                                                                                  |
# +-------------------------------------------------------------------------------------------------------+
# |1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       , |
# |1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       , |
# |1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# +-------------------------------------------------------------------------------------------------------+
票数 1
EN

Stack Overflow用户

发布于 2021-05-06 07:58:03

将文件读入RDD

代码语言:javascript
运行
AI代码解释
复制
val rdd = sc.textFile("/FileStore/tables/sf0.txt")

然后用逗号分隔文件的每一行,并过滤掉头部和尾部。

代码语言:javascript
运行
AI代码解释
复制
val rdd2 = rdd.map(x => x.split(",")).filter(x => x(0) != "*H" && x(0) != "*T")

接下来,将数组转换为可转换为dataframe的元组。如果我们跳过这一步,dataframe将只有一列,其中包含数组中的所有内容。

代码语言:javascript
运行
AI代码解释
复制
val finalRDD = rdd2.map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10)))

将RDD转换为dataframe

代码语言:javascript
运行
AI代码解释
复制
val myDF = finalRDD.toDF()

检查数据帧内容:

代码语言:javascript
运行
AI代码解释
复制
myDF.show()

+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
| _1|       _2|  _3|        _4|  _5|        _6|  _7|   _8|        _9|                 _10|       _11|
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
|  I|000000001|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000002|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000004|GOOD|-000000001| CRT|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004| DPT|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000005|GOOD|-000000001|ABCD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|EFGF|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67408448

复制
相关文章
servlet的工作原理_hibernate工作原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
全栈程序员站长
2022/11/09
1.4K0
servlet的工作原理_hibernate工作原理
Asp.Net Core EndPoint 终结点路由工作原理解读
在本打算写一篇关于Identityserver4 的文章时候,却发现自己对EndPoint -终结点路由还不是很了解,故暂时先放弃了IdentityServer4 的研究和编写;所以才产生了今天这篇关于EndPoint (终结点路由) 的文章。
Jlion
2022/04/07
1K0
Asp.Net Core EndPoint 终结点路由工作原理解读
View的工作原理
ViewRoot对应于ViewRootImpl类,它是连接WindowManager和DecorView的纽带,View的三大流程均是通过ViewRoot完成的。在ActivityThread中,当Activity对象被创建完毕后,会将DecorView添加到Window中,同时创建ViewRootImpl对象,并将ViewRootImpl对象和DecorView建立关联,这个过程可参看源码:
见得乐
2022/07/20
4190
View的工作原理
dubbo的工作原理[通俗易懂]
说一下的dubbo的工作原理?注册中心挂了可以继续通信吗?说说一次rpc请求的流程?
全栈程序员站长
2022/07/22
5460
dubbo的工作原理[通俗易懂]
View的工作原理
在 WindowManagerGlobal 的 addView(View view, ViewGroup.LayoutParams params, Display display, Window parentWindow) 方法中,创建了 ViewRootImpl 对象,将 ViewRootImpl 和 DecorView 相关联:
俞其荣
2022/07/28
1.2K0
View的工作原理
舵机的工作原理
大家好,又见面了,我是你们的朋友全栈君。舵机的伺服系统由可变宽度的脉冲来进行控制,控制线是用来传送脉冲的。脉冲的参数有最小值,最大值,和频率。一般而言,舵机的基准信号都是周期为20ms,宽度为1.5ms。这个基准信号定义的位置为中间位置。舵机有最大转动角度,中间位置的定义就是从这个位置到最大角度与最小角度的量完全一样。最重要的一点是,不同舵机的最大转动角度可能不相同,但是其中间位置的脉冲宽度是一定的,那就是1.5ms。如下图:
全栈程序员站长
2022/08/11
8020
舵机的工作原理
SSH的工作原理
SSH简介 传统的网络服务程序,比如FTP,POP,Telnet,本质上都是不安全的,因为它们在网络上用明文传送数据、用户账号和用户口令,很容易受到中间人攻击方式的攻击,攻击者会冒充真正的服务器接收用户传给服务器的数据,然后再冒充用户把数据传给真正的服务器。 为了满足安全性的需求,IETF的网络工作小组制定了Secure Shell(缩写为SSH),这是一项创建在应用层和传输层基础上的安全协议,为计算机上的Shell提供安全的传输和使用环境。 SSH是目前较可靠,专为远程登录会话和其他网络服务提供安全性的协
大蟒传奇
2018/06/20
1.4K0
Feign的工作原理
Feign是一个伪Java Http 客户端,Feign 不做任何的请求处理。Feign 通过处理注解生成Request模板,从而简化了Http API 的开发。开发人员可以使用注解的方式定制Request API模板。
全栈程序员站长
2022/11/09
6600
Feign的工作原理
Feign的工作原理
Feign是一个声明式的HTTP客户端框架,它能够帮助开发者将HTTP请求转化为Java接口的方法调用。在Feign的内部实现中,它主要依赖于动态代理和反射机制来实现。本文将详细介绍Feign的工作原理,包括Feign的核心组件、请求流程、动态代理和反射机制等方面。
堕落飞鸟
2023/04/07
1K0
adb的工作原理
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/100712.html原文链接:https://javaforall.cn
全栈程序员站长
2022/06/29
7990
RTMP的工作原理
 点击上方“LiveVideoStack”关注我们 翻译:Alex 技术审校:章琦 本文来自OTTVerse,作者为Krishna Rao Vijayanagar。 ▲扫描图中二维码或点击阅读原文▲ 了解音视频技术大会更多信息 RTMP Easy-Tech #028# 什么是RTMP? RTMP(Real-Time Messaging Protocol,实时消息传输协议)是一种用于低延迟、实时音视频和数据传输的双向互联网通信协议,由Macromedia(后被Adobe收购)开发。RTMP的工作原理
LiveVideoStack
2022/05/30
1.4K0
RTMP的工作原理
kubeadm 的工作原理
相信使用二进制部署过 k8s 集群的同学们都知道,二进制部署集群太困难了,有点基础的人部署起来还有成功的希望,要不然只能跟着别人的教程一步一步的去部署,部署的时候完全不知道这样操作的意义是啥?出问题了无从下手解决。对于初学者来说真的是浪费生命,那有没有什么简单的方式来部署集群呢?这个问题在前几年可能没有很好的答案,但是在现在,答案简直太多了,比如 kubeadm,rke 等方式,我们今天就来介绍下 kubeadm 部署集群的工作原理。
张琳兮
2020/06/05
6310
kubeadm 的工作原理
相信使用二进制部署过 k8s 集群的同学们都知道,二进制部署集群太困难了,有点基础的人部署起来还有成功的希望,要不然只能跟着别人的教程一步一步的去部署,部署的时候完全不知道这样操作的意义是啥?出问题了无从下手解决。对于初学者来说真的是浪费生命,那有没有什么简单的方式来部署集群呢?这个问题在前几年可能没有很好的答案,但是在现在,答案简直太多了,比如 kubeadm,rke 等方式,我们今天就来介绍下 kubeadm 部署集群的工作原理。
张琳兮
2020/06/07
1.4K0
listview的工作原理
/**      * Unsorted views that can be used by the adapter as a convert view.      */ private ArrayList<View>[] mScrapViews;   private int mViewTypeCount;   private ArrayList<View> mCurrentScrap;   /**      * Fill ActiveViews wi
xiangzhihong
2018/01/30
2.5K0
listview的工作原理
SpringMVC的工作原理
2、 DispatcherServlet收到请求调用HandlerMapping处理器映射器。
用户4283147
2022/10/27
5720
SpringMVC的工作原理
[TCP/IP] HTTPS的工作原理
一.首先HTTP请求服务端生成证书,客户端对证书的有效期、合法性、域名是否与请求的域名一致、证书的公钥(RSA加密)等进行校验; 二.客户端如果校验通过后,就根据证书的公钥的有效, 生成随机数,随机数使用公钥进行加密(RSA加密); 三.消息体产生的后,对它的摘要进行MD5(或者SHA1)算法加密,此时就得到了RSA签名; 四.发送给服务端,此时只有服务端(RSA私钥)能解密。 五.解密得到的随机数,再用AES加密,作为密钥(此时的密钥只有客户端和服务端知道)。 六.秘钥的协商过程是非对称加密,之后的通讯过程是使用此秘钥的对称加密, 非对称加密算法的性能是非常低的,一般的HTTPS连接只在第一次握手时使用非对称加密,通过握手交换对称加密密钥,在之后的通信走对称加密
唯一Chat
2019/09/10
5900
[TCP/IP] HTTPS的工作原理
HashMap的工作原理
HashMap的工作原理是近年来常见的Java面试题。几乎每个Java程序员都知道HashMap,都知道哪里要用HashMap,知道Hashtable和HashMap之间的区别,那么为何这道面试题如此特殊呢?是因为这道题考察的深度很深。这题经常出现在高级或中高级面试中。投资银行更喜欢问这个问题,甚至会要求你实现HashMap来考察你的编程能力。ConcurrentHashMap和其它同步集合的引入让这道题变得更加复杂。让我们开始探索的旅程吧! 先来些简单的问题 “你用过HashMap吗?” “什么是Hash
java达人
2018/01/31
8010
HashMap的工作原理
JSONP 的工作原理
很简单,就是利用<script>标签没有跨域限制的“漏洞”(历史遗迹啊)来达到与第三方通讯的目的。当需要通讯时,本站脚本创建一个<script>元素,地址指向第三方的API网址,形如:
一个会写诗的程序员
2018/08/17
3690
HashMap的工作原理
    HashMap的工作原理是近年来常见的Java面试题。几乎每个Java程序员都知道HashMap,都知道哪里要用HashMap,知道Hashtable和HashMap之间的区别,那么为何这道面试题如此特殊呢?是因为这道题考察的深度很深。这题经常出现在高级或中高级面试中。投资银行更喜欢问这个问题,甚至会要求你实现HashMap来考察你的编程能力。ConcurrentHashMap和其它同步集合的引入让这道题变得更加复杂。让我们开始探索的旅程吧!
java架构师
2018/09/26
6100
SpringMvc的工作原理
  6.2 @RequestMapping:请求到处理器功能方法的映射规则,可定义到类和方法
用户10196776
2022/11/22
1.2K0
SpringMvc的工作原理

相似问题

ASP.NET ViewState的工作原理

20

ASP.Net Session.Timeout - StateServer和Programmatic Session.Timeout足够好吗?

15

asp.net捆绑的内部工作原理

10

ASP.NET核心UrlHelper及其工作原理

14

Asp.net MasterPage在MVC中的工作原理

10
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档