Track object state in Kusto/ADX/Eventhouse with ease

Track object state in Kusto/ADX/Eventhouse with ease

During our team work on time-series data analytics I've faced interesting problem. It was actually not for the first time, however in past I was using different technologies than Kusto database (engine of Azure Data Explorer ADX and Fabric EventHouse databases) which were somehow easier in use of last predicate. Let's define problem statement first.

Problem statement

It is assumed sensor data is received as a time-series stream of objects with properties:

  • Key - unique key of the sensor
  • SourceTimeStamp - timestamp (UTC or timezone specified) exact date and time of measurement observation
  • Props - type "dynamic" - array of properties and their values (alike POJO) with no hierarchy or nested objects

Every received message is stored into time series table in Kusto database.

Each sensor can produce only partial set of properties in each given SourceTimeStamp, in other words, sensor data can be received per partes. This is typical scenario when streaming OPCUA data via any message broker, even if producer streams data to message broker and implements some kind of batching in order to improve message delivery efficiency.

Objective: define query which calculates actual (last) values when given a timespan and time window. Null value shall not be considered as valid value if there are any non null values for given property in given interval.

Solution

For sake of simplicity I do not define table in Kusto, I just use declarative table defined as:

Input dataset

What we have here is some unique key Id (can be synthetic key uniquely identifying any row) and the rest of defined properties Key, SourceTimeStamp, Props.

Intuitively we can see the solution is to implement some window function on time stamp bins. It seems with Kusto we have couple of options:

  • last predicate
  • scan, reduce predicate
  • arg_max aggregation function
  • take_any aggregation function

last predicate requires column, so can't be generally used to get last valid (no null) values of properties.

take_any aggregation has non-deterministic behavior of the same key is found in a bin; we need to get actual == last value of property.

So we will try to use arg_max, which is deterministic and can work on [key, value] pairs. Scan could do the job, however this use case I haven't tried, leaving space for experimentation.

Final result looks like the following code snippet:

Complete example code

Note properties Name and SN were sent twice with different values, e.g. as a result of user entry correction in source system. Same is generally applicable for any slowly changing properties or telemetry properties.

Let's discuss briefly:

let lookupWindow = 1min;        

defines time window length,

equipmentBinned
| mv-expand kind=array Props
| extend propKey=tostring(Props[0]), propValue=Props[1], binKey = strcat(tsBin,tostring(Props[0]))        

expands dynamic data type properties and then extends them into Key/Value pairs, creates time bin + property key unique "sub-bin",

| summarize arg_max(SourceTimeStamp, *) by Key,binKey
| where SourceTimeStamp < datetime(2019-02-07 16:32:15.903)        

the most important part alongside with binKey creation. Finds "maximum" over SourceTimeStamp in bins defined by sensor (Key) and sub-bin defined as SourceTimeStamp+timeStampBin and filtered to the moment we want to track the state. By applying this logic we discard older values of the same properties, in our model case it is property Name, sent twice.

| extend propsBag=bag_pack(propKey, propValue) 
| summarize propsMerged = make_bag(propsBag) by Key,tsBin
| order by Key asc,tsBin asc;        

This final piece of code is just to make all properties in bins represented again as dynamic data type.

The result depends obviously on two factors:

  • window bin setting
  • history we want to take (filter on SourceTimeStamp)

Result looks like:


Results of time window aggregation to track object state

Conclusion

It has been demonstrated time window object tracker alike query can be achieve using Kusto language and database on synthetic result. Refactoring to function is easy to do, again, I'm leaving sapce for experimentation.

Benchmarks hasn't been performed, however it would be supr interesting to see how the query (or better function) behaves on large datasets and whether this is scalable approach in order to track individual "objects" state in Kusto.

要查看或添加评论,请登录

Ond?ej Spilka的更多文章

  • Combined engineering (good and bad)

    Combined engineering (good and bad)

    In recent years many truly agile software companies changed their development paradigm to combined engineering, and I…

    2 条评论
  • About CDM and Synapse (templates)

    About CDM and Synapse (templates)

    Common Data Model (CDM) is used in many places including Dynamics365, DataVerse, but also in Synapse, Azure Data Lake…

    1 条评论

社区洞察

其他会员也浏览了