返回

PySpark:Explode架构列与底层嵌套架构不匹配

发布时间:2022-07-17 13:53:45 288
# json# 数据

我正在将pyspark与Azure Synapse结合使用。我正在使用以下示例读取数据帧中具有相同结构的多个嵌套JSON:

{
    "AmountOfOrders": 2,
    "TotalEarnings": 1800,
    "OrderDetails": [
        {
            "OrderNumber": 1,
            "OrderDate": "2022-7-06",
            "OrderLine": [
                {
                    "LineNumber": 1,
                    "Product": "Laptop",
                    "Price": 1000
                },
                {
                    "LineNumber": 2,
                    "Product": "Tablet",
                    "Price": 500
                },
                {
                    "LineNumber": 3,
                    "Product": "Mobilephone",
                    "Price": 300
                }
            ]
        },
        {
            "OrderNumber": 2,
            "OrderDate": "2022-7-06",
            "OrderLine": [
                {
                    "LineNumber": 1,
                    "Product": "Printer",
                    "Price": 100,
                    "Discount": 0
                },
                {
                    "LineNumber": 2,
                    "Product": "Paper",
                    "Price": 50,
                    "Discount": 0
                },
                {
                    "LineNumber": 3,
                    "Product": "Toner",
                    "Price": 30,
                    "Discount": 0
                }
            ]
        }
    ]
}

我试图使用提取数据帧的数组和结构的泛型函数在单独的数据帧中获取Ordernumber 1的行号。使用以下代码:

def read_nested_structure(df,excludeList,messageType,coll):
    display(df.limit(10))
    print('read_nested_structure')
    cols =[]
    match = 0
    match_field = ""
    print(df.schema[coll].dataType.fields)
    for field in df.schema[coll].dataType.fields:

        for c in excludeList:
            if c == field.name:
                print('Match = ' + field.name)
                match = 1
        if match == 0:   
            # cols.append(coll)
            cols.append(col(coll + "." + field.name).alias(field.name))
        match = 0 
        #         cols.append(coll)
    print(cols)  
    df = df.select(cols)
    return df

def read_nested_structure_2(df,excludeList,messageType):
    cols =[]
    match = 0
    for coll in df.schema.names:
        if isinstance(df.schema[coll].dataType, ArrayType):
            print(  coll +  "-- : Array")
            df = df.withColumn(coll, explode(coll).alias(coll))
            cols.append(coll)

        elif isinstance(df.schema[coll].dataType, StructType):
            if messageType == 'Header':
                for field in df.schema[coll].dataType.fields:
                    cols.append(col(coll + "." + field.name).alias(coll + "_" + field.name))
            
            elif messageType == 'Content':
                print('Struct - Content')
                for field in df.schema[coll].dataType.fields:
                    cols.append(col(coll + "." + field.name).alias(field.name))

        else:
            for c in excludeList:
                if c == coll:
                    match = 1
            if match == 0:
                cols.append(coll)
                        
    df = df.select(cols)
    return df

df = spark.read.load(datalakelocation + '/test.json', format='json')
df =  unpack_to_content_dataframe_simple_2(df,exclude)
df = df.filter(df.OrderNumber == 1)
df =  unpack_to_content_dataframe_simple_2(df,exclude)
display(df.limit(10))

这将导致以下数据帧:

如您所见,黄色标记的属性已添加到不属于 OrderNumber 1 的数据框中。如何过滤数据框中的一行导致更新模式(在这种情况下没有折扣属性)?

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(1)
按点赞数排序
用户头像