我需要将JSON的列表转换为pySpark DataFrames。JSON都有相同的架构。问题是JSON中dicts的值条目有不同的数据类型。
例如:字段complex
是一个Dicts数组,Dict有四个键,但类型不同(整数、字符串、浮点数和嵌套Dict)。参见下面的示例JSON。
如果我使用df = spark.createDataFrame(json_list)
从jsons中创建我的DataFrame,因为他无法正确地推断模式,所以pyspark“删除”了一些数据。PySpark决定complex
-field的架构应为:StructType("complex", ArrayType(MapType(StringType(), LongType())))
,这将导致非长类型值为空.
我试图提供一个模式,但是由于我需要设置一个特定的(?)DataType用于嵌套MapType的值字段-它并不是统一的,但是不同.
myschema = StructType([
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructType("complex", ArrayType(MapType(StringType(), StringType())))
])
MapType(StringType(), StringType())))
意味着dict中的一些值字段被空化,因为它不能被映射。
似乎只有当所有值的数据类型都相同时,PySpark才能处理dicts。
如何在不丢失数据的情况下将JSON转换为pyspark?
[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]
发布于 2022-03-28 08:24:38
可以将complex
列的架构指定为结构数组。
myschema = StructType(
[
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructField(
"complex",
ArrayType(StructType(
[
StructField("key1", LongType(), True),
StructField("key2", StringType(), True),
StructField("key3", StringType(), True),
StructField(
"key4",
StructType(
[
StructField("innerkey1", StringType(), True),
StructField("innerkey2", StringType(), True),
StructField("innerkey3", IntegerType(), True),
]
)
)
]
))
)
]
)
发布于 2022-03-28 10:04:11
如果不希望传递模式或希望spark从3.0+检测模式,则可以将json写入表中。
%sql
CREATE TABLE newtable AS SELECT
'{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}'as original
将表转换为数据格式
df1 =spark.sql('select * from newtable')
rdd
表中的单个列
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
利用.read
读取rdd
模式和集合是可以的
newschema=spark.read.json(rdd).schema
使用select将架构分配给列
df3=df1.select("*",from_json("original", newschema).alias("transrequest"))
df3.select('transrequest.*').show(truncate=False)
+-------+----------------------------------------------------------------+---------+-------------+
|Id |complex |name |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402 |
+-------+----------------------------------------------------------------+---------+-------------+
模式
root
|-- Id: string (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: long (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: double (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: double (nullable = true)
| | | |-- innerkey3: long (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
发布于 2022-03-28 09:51:31
添加到@过过招的答案中,下面是我个人使用的方法,因为它在定义过过招模式时涉及较少的代码。
输入JSON
jsonstr = """[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]"""
把它转换成RDD
-
import json
rdd = sc.parallelize(json.loads(jsonstr))
创建dataframe
-
df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)
#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id |name |sentTimestamp|complex |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402 |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+
#Output Schema
root
|-- Id: string (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: float (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: float (nullable = true)
| | | |-- innerkey3: integer (nullable = true)
https://stackoverflow.com/questions/71644191
复制相似问题