首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将模式数据类型JSON混合到PySpark DataFrame

将模式数据类型JSON混合到PySpark DataFrame
EN

Stack Overflow用户
提问于 2022-03-28 08:05:17
回答 3查看 445关注 0票数 0

我需要将JSON的列表转换为pySpark DataFrames。JSON都有相同的架构。问题是JSON中dicts的值条目有不同的数据类型。

例如:字段complex是一个Dicts数组,Dict有四个键,但类型不同(整数、字符串、浮点数和嵌套Dict)。参见下面的示例JSON。

如果我使用df = spark.createDataFrame(json_list)从jsons中创建我的DataFrame,因为他无法正确地推断模式,所以pyspark“删除”了一些数据。PySpark决定complex-field的架构应为:StructType("complex", ArrayType(MapType(StringType(), LongType()))) ,这将导致非长类型值为空.

我试图提供一个模式,但是由于我需要设置一个特定的(?)DataType用于嵌套MapType的值字段-它并不是统一的,但是不同.

代码语言:javascript
运行
复制
myschema = StructType([
                             StructField("Id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("sentTimestamp", LongType(), True),
                             StructType("complex", ArrayType(MapType(StringType(), StringType())))
                             ])

MapType(StringType(), StringType())))意味着dict中的一些值字段被空化,因为它不能被映射。

似乎只有当所有值的数据类型都相同时,PySpark才能处理dicts。

如何在不丢失数据的情况下将JSON转换为pyspark?

代码语言:javascript
运行
复制
[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-03-28 08:24:38

可以将complex列的架构指定为结构数组。

代码语言:javascript
运行
复制
myschema = StructType(
    [
        StructField("Id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("sentTimestamp", LongType(), True),
        StructField(
            "complex",
            ArrayType(StructType(
                [
                    StructField("key1", LongType(), True),
                    StructField("key2", StringType(), True),
                    StructField("key3", StringType(), True),
                    StructField(
                        "key4",
                        StructType(
                            [
                                StructField("innerkey1", StringType(), True),
                                StructField("innerkey2", StringType(), True),
                                StructField("innerkey3", IntegerType(), True),
                            ]
                        )
                    )
                ]
            ))
        )
    ]
)
票数 1
EN

Stack Overflow用户

发布于 2022-03-28 10:04:11

如果不希望传递模式或希望spark从3.0+检测模式,则可以将json写入表中。

代码语言:javascript
运行
复制
%sql

CREATE TABLE newtable AS SELECT
'{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}'as original

将表转换为数据格式

代码语言:javascript
运行
复制
df1 =spark.sql('select * from newtable')

rdd表中的单个列

代码语言:javascript
运行
复制
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)

利用.read读取rdd模式和集合是可以的

代码语言:javascript
运行
复制
newschema=spark.read.json(rdd).schema

使用select将架构分配给列

代码语言:javascript
运行
复制
df3=df1.select("*",from_json("original", newschema).alias("transrequest"))

df3.select('transrequest.*').show(truncate=False)

+-------+----------------------------------------------------------------+---------+-------------+
|Id     |complex                                                         |name     |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402   |
+-------+----------------------------------------------------------------+---------+-------------+

模式

代码语言:javascript
运行
复制
root
 |-- Id: string (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: long (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: double (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: double (nullable = true)
 |    |    |    |-- innerkey3: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)
票数 1
EN

Stack Overflow用户

发布于 2022-03-28 09:51:31

添加到@过过招的答案中,下面是我个人使用的方法,因为它在定义过过招模式时涉及较少的代码。

输入JSON

代码语言:javascript
运行
复制
jsonstr = """[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]"""

把它转换成RDD -

代码语言:javascript
运行
复制
import json

rdd = sc.parallelize(json.loads(jsonstr))

创建dataframe -

代码语言:javascript
运行
复制
df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)

#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id     |name     |sentTimestamp|complex                                                         |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402   |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+

#Output Schema
root
 |-- Id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: float (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: float (nullable = true)
 |    |    |    |-- innerkey3: integer (nullable = true)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71644191

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档