我想要逐行读取一个用例,我需要从header读取数据,并将数据发送到Header和Trailer之间的所有记录Spark不允许逐行读取-我们如何实现这样的用例。
*H,TextStart,1244
I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,
I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,
*T,TextEnd
*H,TextStart,1235
I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,
I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,
*T,TextEnd
*H,TextStart,1244
I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,
I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,
*T,TextEnd
*H,TextStart,1236
I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,
I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,
*T,TextEnd
预期的输出是从头部获取数据,并附加到头部和尾部记录。我不知道该怎么做
1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU,
1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU,
1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU,
1236,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU,
1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,
1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,
1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU,
1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND ,LCU,
我使用了zip索引并填充了行号。
df = spark.read.text('/hdfsData/file.csv')
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_11 = spark.sql("select value.value ,index from linenumber where value.value like '*H,%' or value.value like '*T,%'")
现在,我计划连接df_11和df_1,并执行一些范围连接逻辑来选择值。但是,有没有其他有效的方法来达到同样的效果呢?
发布于 2021-05-06 20:07:38
我已经使用zip索引和窗口滞后函数实现了同样的功能。然而,这将在单个分区中完成,因为数据的性质要求它。提供适当的执行器内存。
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import *
df = spark.read.text('MQ_out.csv')
#Adding index column each row get its row numbers
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_1.createOrReplaceTempView("linenumber")
#zipindex creates array making back to string
df_2 = spark.sql("select value.value as value , index from linenumber")
df_2.createOrReplaceTempView("linenumber1")
df_3 = spark.sql("select * from linenumber1 where value is not null or trim(value)!=''")
df_3.createOrReplaceTempView("linenumber2")
#Splitting and extracting the location value from header and assigning null
df_new = spark.sql("select value,case when value like '*H,%' then split(value,',')[2] else null end as location,index from linenumber2")
#Assign previous row value to next row if the current row is null
df_new=df_new.withColumn('newlocation',F.when(df_new.location>0,df_new.location).otherwise(F.last(df_new.location,ignorenulls=True).over(Window.orderBy("index"))))
#counting the number of , in data
df_new.withColumn('Comma_Count', size(split(col("value"), r",")) - 1)
df_new.createOrReplaceTempView("FinalData")
#remove Header and tralierrows which doesnt have needed number of commans
df_final=spark.sql("select value,newlocation as location from FinalData where Comma_Count = 11")
df_final.createOrReplaceTempView("FinalData_1")
#concat to bring the required data
df_write = spark.sql("select concat(value,newlocation) from FinalData_1")
df_write.text('Filename')
发布于 2021-05-13 21:14:28
我通过使用两个lag
函数成功地做到了这一点
(df
.withColumn('id', F.monotonically_increasing_id()) # make fake ID for window functions below
.withColumn('lag_1', F.lag('value', 1).over(W.orderBy('id'))) # grab the previous row (offset 1)
.withColumn('lag_2', F.lag('value', 2).over(W.orderBy('id'))) # grab the previous row (offset 2)
.withColumn('heading', F # make heading column based on availability of both lag_1 and lag_2
.when(F.col('lag_1').startswith('*H,TextStart'), F.col('lag_1'))
.when(F.col('lag_2').startswith('*H,TextStart'), F.col('lag_2'))
)
.where(F.col('heading').isNotNull()) # remove header and trailer rows, which does not have proper heading
.withColumn('value', F.concat(F.split(F.col('heading'), ',')[2], F.lit(','), F.col('value')))
# concatenate heading number with current value
.drop('id', 'lag_1', 'lag_2', 'heading') # remove temporary columns
.show(20, False)
)
# +-------------------------------------------------------------------------------------------------------+
# |value |
# +-------------------------------------------------------------------------------------------------------+
# |1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,|
# |1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,|
# |1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,|
# |1235,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,|
# |1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND ,LCU , |
# |1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND ,LCU , |
# |1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND ,LCU ,|
# |1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND ,LCU ,|
# +-------------------------------------------------------------------------------------------------------+
发布于 2021-05-06 07:58:03
将文件读入RDD
val rdd = sc.textFile("/FileStore/tables/sf0.txt")
然后用逗号分隔文件的每一行,并过滤掉头部和尾部。
val rdd2 = rdd.map(x => x.split(",")).filter(x => x(0) != "*H" && x(0) != "*T")
接下来,将数组转换为可转换为dataframe的元组。如果我们跳过这一步,dataframe将只有一列,其中包含数组中的所有内容。
val finalRDD = rdd2.map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10)))
将RDD转换为dataframe
val myDF = finalRDD.toDF()
检查数据帧内容:
myDF.show()
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
| _1| _2| _3| _4| _5| _6| _7| _8| _9| _10| _11|
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
| I|000000001|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND |LCU |
| I|000000062|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND |LCU |
| I|000000002|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND |LCU |
| I|000000035|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND |LCU |
| I|000000004|GOOD|-000000001| CRT|+000000000|SOM |00002|+000000000|ONHAND |LCU |
| I|000000062|GOOD|+000000004| DPT|+000000000|SOM |00001|+000000000|ONHAND |LCU |
| I|000000005|GOOD|-000000001|ABCD|+000000000|SOM |00002|+000000000|ONHAND |LCU |
| I|000000035|GOOD|+000000004|EFGF|+000000000|SOM |00001|+000000000|ONHAND |LCU |
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
https://stackoverflow.com/questions/67408448
复制相似问题