Exploding data and domesticating wild JSON with Databricks

Exploding data and domesticating wild JSON with Databricks

Recently I wanted to wrangle some information from wild and unruly JSON data. The challenge wasn't the structure itself, but the fact that it was as unpredictable as a bucking bronco.

Databricks is my trusty six-shooter and the Assistant is my loyal sidekick. Still, it took us some good old web searches and a couple of rides to lasso the logic. I hope this becomes the piece of advice I was trying to find.

The Corral

We have company information in a database, and need to keep it current with updates from an information provider. The updates cover hundreds if not thousands of attributes, but we're only interested in a fraction of them.

Wild JSON

There's no pattern to which attribute values get updated. In any period, one company gets updates to certain attributes, another company gets updates to a different set of attributes, and a third gets none.

The changes come as a list of elements per company with previous and current values of the attributes of each element.

The central part of the structure can be illustrated as

{
  "companyid": "abc123",
  "elements": [
    {
      "elementType": "numberOfEmployees",
      "previous": {[
        {"value": 500,
         "scope": "headquarters"},
        {"value": 5000,
         "scope": "global"}
       ]},
      "current": {[
        {"value": 550,
         "scope": "headquarters"},
        {"value": 6000,
         "scope": "global"}
       ]}
    },
    {
      "elementType": "latestFinancials",
      "previous": {
        "units": "MUSD",
        "TotalRevenue": 500,
        "netAssets": 100,
        "totalLiabilities": 100
      },
      "current": {
        "units": "MUSD",
        "TotalRevenue": 510,
        "netAssets": 90,
        "totalLiabilities": 110
      }
    },
  ]
}        

  • number and type of elements per company vary
  • the structure of each element varies
  • structures are nested and contain arrays of variable length
  • only a fraction of the attributes within those structures are needed

The works

Before the end credits, I'll describe what works. In an unnecessary sequel I might present a couple of approaches that didn't work, and why.

I pull daily updates from a REST API and store the raw json data to a delta table along with metadata. The table schema can be illustrated as

I wanted to use SQL as much as possible, because data moves from one delta table to another, SQL parallelises well, and most employees in our organisation know it. No cowboy coding!

The transformation process has three main parts

  1. get the different element types into separate rows
  2. read each element type based on its schema and select relevant attributes
  3. aggregate results into one row per company

The explosions

To handle the arrays, I repeatedly use from_json() with a schema to parse the json and explode() to get rows for the array entries. In the schema I list only the elements I am interested in.

I separate all elements per company into a temp view so that there's one line per element type. Idea

  DECLARE update_schema =
  'STRUCT<
      transactionID: STRING
      ,elements: ARRAY<STRUCT<
          current: STRING
          ,elementType: STRING
          ,previous: STRING
          ,timestamp: STRING
      >>
      ,organization: STRUCT<company: STRING> 
      >' ;

CREATE OR REPLACE TEMP VIEW updates AS
    SELECT
    companyid
    ,transactionID
    ,e.timestamp
    ,e.elementType
    ,e.current FROM ( 
      SELECT
      companyid
      ,EXPLODE(js.elements) AS e FROM (
        SELECT
        * EXCEPT (api_raw_data)
        ,FROM_JSON(api_raw_data, update_schema) AS js 
          FROM rawdata_table
        ) 
    ) ;        

Breaking it down

  • from_json() reads the field api_raw_data so that it becomes an object with the structure update_schema. Without it, explode() would not work, because api_raw_data is just a string.
  • explode() works on the elements array and makes one row for each element in array while keeping other fields constant (companyid and transactionID in this example)

Bridling the JSON

Now that the updates view has separate rows per elementType, I can read the rows by elementType and apply a matching schema. Idea

DECLARE  ne_schema = 
    'ARRAY<STRUCT<
      value: INT
      ,scope: STRING
    >>' ;
    
  CREATE OR REPLACE TEMP VIEW number_of_employees AS
    SELECT companyid
    ,TRY_CAST(js.value AS int)
    ,js.scope
    FROM
      ( SELECT companyid, 
       EXPLODE(FROM_JSON(current, ne_schema )) AS js
       FROM updates
       WHERE elementType = 'numberOfEmployees' ) ;        

In practice the schemas were nested and with arrays, but then it was just a matter of repeating explode().

Driving the herd home

From the previous steps, I have many elementType specific temp views. One companyid can have rows in many of the temp views.

However, I need a table with only one line per companyid so that I can merge the results into the final curated table.

The idea is to join all the temp views into one wide table. Because of all the arrays, the join results in a sparse matrix - lots of rows and fields per company, but few of them have values. In fact, for each field, only one of the rows has a value. That's why I can aggregate the rows with MAX() and group by companyid. Idea

 CREATE OR REPLACE TABLE companydata_merge AS
    SELECT
    companyid
    ,MAX(ne_global.value) AS global
    -- etc. etc.
    ,MAX(lf.totalLiabilities) AS totalLiabilities
    FROM (
      SELECT companyid, value FROM number_of_employees WHERE scope = 'global'
     ) AS ne_global
    -- etc. etc. joins 
    LEFT JOIN latest_financials AS lf ON ne_global.companyid = lf.companyid
    GROUP BY companyid ;        

And finally I can merge it all to the curated table. By the way, I really love the Assistant that, based on very simple prompts, wrote the table joins and assign statements with dataFieldsWithDescriptiveButReallyAwkwardlyLongNames.

    MERGE INTO companydata c
        USING companydata_merge m
        ON c.companyid = m.companyid
        WHEN MATCHED THEN UPDATE 
        SET
        -- coalesce because the merge table is so sparse
        c.hqLevel= COALESCE(m.hqLevel, cur.hqLevel)
        --etc. etc. 
     ;        

Riding off into the sunset

As the dust settles, I see that the curated table has data ready for action. The wrangling is broken down into steps, which should be easier to read and understand than one massive query. Runtimes are OK, because most steps take place in temporary views that the engine stacks up and optimises as a whole.

As well as reminding me, I hope this text helps the fellow that lands here in search of ways to domesticate some unruly JSON.

Happy trails, data wranglers!



Jose Roberto Estupinian

Data Architect Manager at Sievo

4 个月

Such an engaging read! Nice work

要查看或添加评论,请登录

Arto Pihlaja的更多文章

  • Dream of Californication

    Dream of Californication

    I revived an old blog post of mine to save some good memories On a recent business trip to San Francisco and Silicon…

  • Calypso and cloud computing

    Calypso and cloud computing

    Laptop. White sandy beach.

    9 条评论

社区洞察

其他会员也浏览了