Nested JSON arrays: the perfect niche for Dataflow Gen2 in Fabric

Nested JSON arrays: the perfect niche for Dataflow Gen2 in Fabric

On a project I'm currently working on I have to work with JSON files that are heavily nested. And by heavily I mean more than 10 levels, running printSchema() in PySpark results in the following hierarchy:

The schema truncated

The schema above is truncated at line 41 out of 227 for readability, but I think you get the point. The task was to flatten each of the 'groups' arrays, extract the 'key' value in each, and then combine those with the measures in the 'values' struct in a nice flat table.

I got it to work in PySpark by using a combination of explode and select, but it surely wasn't an enjoyable experience! You can see the main code snippet below:

#Load to dataframe
df_read = df_filtered

#Explode dataframe arrays into separate columns for each level in the hierarchy keeping the data intact on each level
df_exploded = df_read.withColumn("dimension1", \
                    explode("dimension.groups"))
df_exploded = df_exploded.withColumn("dimension2", \
                     explode("dimension1.dimension.groups"))
df_exploded = df_exploded.withColumn("dimension3", \
                     explode("dimension2.dimension.groups"))
df_exploded = df_exploded.withColumn("dimension4", \
                     explode("dimension3.dimension.groups")) 
df_exploded = df_exploded.withColumn("dimension5", \
                     explode("dimension4.dimension.groups"))
df_exploded = df_exploded.withColumn("dimension6", \
                     explode("dimension5.dimension.groups")) 
df_exploded = df_exploded.withColumn("dimension7", \
                     explode("dimension6.dimension.groups")) 
df_exploded = df_exploded.withColumn("dimension8", \
                     explode("dimension7.dimension.groups")) 
df_exploded = df_exploded.withColumn("dimension9", \
                     explode("dimension8.dimension.groups")) 
df_exploded = df_exploded.withColumn("dimension10", \
                     explode("dimension9.dimension.groups"))  
df_exploded = df_exploded.withColumn("dimension11", \
                     explode("dimension10.dimension.groups")) 



#Select the columns we want to keep from each level of the hierarchy
df_selected = df_exploded.select("dimension1.key", \
           "dimension2.key", \
           "dimension3.key", \
           "dimension4.key", \
           "dimension5.key", \
           "dimension6.key", \
           "dimension7.key", \
           "dimension8.key", \
           "dimension9.key", \
           "dimension10.key", \
           "dimension11.key", \
           "dimension11.values.actImpressions", \
           "dimension11.values.estImpressions", \
           "dimension11.values.plannedMediaCost", \
           "dimension11.values.actualMediaCost", \
           "dimension11.values.ctcEur",   )        

Amid my frustration building the solution in PySpark, I got the idea to try Dataflow Gen2 in Fabric. Mostly as a test, to see if I could even get the data flattened into a tabular format.

Enter Dataflow Gen2 in Fabric

I usually shy away from using Dataflows because of its tight data transformation coupling (each applied step/transformation must reference the previous one), slow performance, and often shoddy SQL produced, but this time was different!

I leveraged the expand function to open up each of the nested arrays traversing the hierarchy with ease, while also allowing me to evaluate the data produced at each step:

Dataflow Gen2 expand function

This made development time on transforming the data into a tabular format much faster, as I used minutes to create the Dataflow as opposed to hours creating the PySpark code.

The performance measured on execution duration was not too bad either! Flattening the JSON into a table using PySpark took about 20 seconds and Dataflow Gen2 used about 30 seconds. About 50 % slower in relative terms, but since the JSON files in question are output by an API, they will not grow significantly in size over time making the difference moot in nominal terms.

The gripe I have around tight data transformation coupling (and thereby code reusability and maintainability) still stands when looking at the M code produced, but in this case, creating a new Dataflow from scratch wouldn't take long anyway.

Tightly coupled M code from the Advanced Editor

All in all, it seems like Dataflow Gen2 in Fabric has found a sweet spot when working with heavily nested arrays in JSON or other data formats that allow for nested arrays. The visual nature of the tool simply makes traversing the hierarchy a breeze and makes me feel in full control of the data produced.

I still ended up using PySpark notebooks for the job, as our Data Warehouse runs on Azure Databricks which doesn't allow for Fabric Dataflow Gen2 to write into it.

Nathan Sundararajan

Manager - Data Engineering, Architecture and Integration

4 个月

Databricks variant data type can handle this very well. We have complex json and I use the variant data type and it made parsing easy

回复
?ukasz Lebioda

Speaking PL, EN, IT, ESP _ POWER BI Developer / People oriented leader / Lean Management enthusiast / Six Sigma Black belt / Fast learner

4 个月

How about using a loop and use “While” to explode columns until there are no more arrays to process? Should shorten the code substantially. Hard to check the performance without running it but writing the code would be less time consuming.

Marc Van Der Wielen

* Not available * Freelance Microsoft Data Engineer / Power BI developer / Solutions architect * Not available *

4 个月

With pyspark you can code these kind of tasks quite efficiently, something in the direction below should do the same job. Haven't tested the code myself so it might need some tweaking but hope this shows the power of what can be accomplished with pyspark. from pyspark.sql.functions import explode, col # Load to dataframe df_read = df_filtered # Set the number of levels in the hierarchy num_levels = 11 # Explode dataframe arrays into separate columns for each level in the hierarchy df_exploded = df_read for i in range(1, num_levels + 1): df_exploded = df_exploded.withColumn(f"dimension{i}", explode(col(f"dimension{(i-1)}.dimension.groups") if i > 1 else col("dimension.groups"))) # Define the list of columns to keep columns_to_select = [f"dimension{i}.key" for i in range(1, num_levels + 1)] + [ "dimension11.values.actImpressions", "dimension11.values.estImpressions", "dimension11.values.plannedMediaCost", "dimension11.values.actualMediaCost", "dimension11.values.ctcEur", ] # Select the required columns df_selected = df_exploded.select(*columns_to_select)

要查看或添加评论,请登录

Morten Gammelgaard Hannibalsen的更多文章