源数据表
/Company/Engineering/DataTeam 45
/Company/Engineering/Mobile 50
输出数据表
/Company 45
/Company/Engineering 45
/Company/Engineering/DataTeam 45
/Company 50
/Company/Engineering 50
/Company/Engineering/MobileTeam 50
因此,我的问题基本上是通过查看上面的源和输出数据表,从源到输出数据表的转换,如何使用spark实现它。
我不能使用UDF,因为有了UDF,您就不能返回行。因此,我的下一步是使用UDF在内存中创建数据帧并追加行。但是这种方法的问题是,数据框架将有超过10亿行,我不确定这是否可行。
对于如何使用spark实现这一点,有什么建议吗?
发布于 2019-01-28 13:48:04
在UDF中,您可以返回一个SeqString,它可以爆炸以获得多个行。
看看这个:
scala> val df = Seq(("/Company/Engineering/DataTeam",45),("/Company/Engineering/Mobile",50)).toDF("a","b")
df: org.apache.spark.sql.DataFrame = [a: string, b: int]
scala> df.show(false)
+-----------------------------+---+
|a |b |
+-----------------------------+---+
|/Company/Engineering/DataTeam|45 |
|/Company/Engineering/Mobile |50 |
+-----------------------------+---+
scala> val udf_hier_str = udf( (x:String) => x.split('/').drop(1).scanLeft(""){(acc, next) => acc + "/" + next}.drop(1) )
udf_hier_str: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(StringType)))
scala> df.withColumn("gen_hier",explode(udf_hier_str('a))).show(false)
+-----------------------------+---+-----------------------------+
|a |b |gen_hier |
+-----------------------------+---+-----------------------------+
|/Company/Engineering/DataTeam|45 |/Company |
|/Company/Engineering/DataTeam|45 |/Company/Engineering |
|/Company/Engineering/DataTeam|45 |/Company/Engineering/DataTeam|
|/Company/Engineering/Mobile |50 |/Company |
|/Company/Engineering/Mobile |50 |/Company/Engineering |
|/Company/Engineering/Mobile |50 |/Company/Engineering/Mobile |
+-----------------------------+---+-----------------------------+
scala>
发布于 2020-12-24 06:33:15
另一种解决方案,没有udf()
val df = spark.sql("""
select '/Company/Engineering/DataTeam' a,45 b union
select '/Company/Engineering/Mobile',50 b
""")
df.show(false)
df.createOrReplaceTempView("df")
+-----------------------------+---+
|a |b |
+-----------------------------+---+
|/Company/Engineering/DataTeam|45 |
|/Company/Engineering/Mobile |50 |
+-----------------------------+---+
解决方案-1:使用posexplode()
spark.sql("""
select *, concat_ws("/",slice(split(a,"/"),1,x+1)) z from (
select a,b, posexplode(split(a,"/")) (x,y) from df
) where x<> 0
""").show(false)
+-----------------------------+---+---+-----------+-----------------------------+
|a |b |x |y |z |
+-----------------------------+---+---+-----------+-----------------------------+
|/Company/Engineering/DataTeam|45 |1 |Company |/Company |
|/Company/Engineering/DataTeam|45 |2 |Engineering|/Company/Engineering |
|/Company/Engineering/DataTeam|45 |3 |DataTeam |/Company/Engineering/DataTeam|
|/Company/Engineering/Mobile |50 |1 |Company |/Company |
|/Company/Engineering/Mobile |50 |2 |Engineering|/Company/Engineering |
|/Company/Engineering/Mobile |50 |3 |Mobile |/Company/Engineering/Mobile |
+-----------------------------+---+---+-----------+-----------------------------+
解决方案2:使用transform()高阶函数。
spark.sql("""
select * from (
select a, b, explode_outer(z) z2 from (
select a,b, transform(split(a,"/"), (x,i) -> concat_ws("/",slice(split(a,"/"),1,i+1)) ) z from df
) ) where length(z2) > 0
""").show(false)
+-----------------------------+---+-----------------------------+
|a |b |z2 |
+-----------------------------+---+-----------------------------+
|/Company/Engineering/DataTeam|45 |/Company |
|/Company/Engineering/DataTeam|45 |/Company/Engineering |
|/Company/Engineering/DataTeam|45 |/Company/Engineering/DataTeam|
|/Company/Engineering/Mobile |50 |/Company |
|/Company/Engineering/Mobile |50 |/Company/Engineering |
|/Company/Engineering/Mobile |50 |/Company/Engineering/Mobile |
+-----------------------------+---+-----------------------------+
https://stackoverflow.com/questions/54373706
复制相似问题