Track object state in Kusto/ADX/Eventhouse with ease
Ond?ej Spilka
Our greatest glory is not in never failing, but in rising every time we fail ― Confucius
During our team work on time-series data analytics I've faced interesting problem. It was actually not for the first time, however in past I was using different technologies than Kusto database (engine of Azure Data Explorer ADX and Fabric EventHouse databases) which were somehow easier in use of last predicate. Let's define problem statement first.
Problem statement
It is assumed sensor data is received as a time-series stream of objects with properties:
Every received message is stored into time series table in Kusto database.
Each sensor can produce only partial set of properties in each given SourceTimeStamp, in other words, sensor data can be received per partes. This is typical scenario when streaming OPCUA data via any message broker, even if producer streams data to message broker and implements some kind of batching in order to improve message delivery efficiency.
Objective: define query which calculates actual (last) values when given a timespan and time window. Null value shall not be considered as valid value if there are any non null values for given property in given interval.
Solution
For sake of simplicity I do not define table in Kusto, I just use declarative table defined as:
What we have here is some unique key Id (can be synthetic key uniquely identifying any row) and the rest of defined properties Key, SourceTimeStamp, Props.
Intuitively we can see the solution is to implement some window function on time stamp bins. It seems with Kusto we have couple of options:
last predicate requires column, so can't be generally used to get last valid (no null) values of properties.
take_any aggregation has non-deterministic behavior of the same key is found in a bin; we need to get actual == last value of property.
So we will try to use arg_max, which is deterministic and can work on [key, value] pairs. Scan could do the job, however this use case I haven't tried, leaving space for experimentation.
Final result looks like the following code snippet:
领英推荐
Note properties Name and SN were sent twice with different values, e.g. as a result of user entry correction in source system. Same is generally applicable for any slowly changing properties or telemetry properties.
Let's discuss briefly:
let lookupWindow = 1min;
defines time window length,
equipmentBinned
| mv-expand kind=array Props
| extend propKey=tostring(Props[0]), propValue=Props[1], binKey = strcat(tsBin,tostring(Props[0]))
expands dynamic data type properties and then extends them into Key/Value pairs, creates time bin + property key unique "sub-bin",
| summarize arg_max(SourceTimeStamp, *) by Key,binKey
| where SourceTimeStamp < datetime(2019-02-07 16:32:15.903)
the most important part alongside with binKey creation. Finds "maximum" over SourceTimeStamp in bins defined by sensor (Key) and sub-bin defined as SourceTimeStamp+timeStampBin and filtered to the moment we want to track the state. By applying this logic we discard older values of the same properties, in our model case it is property Name, sent twice.
| extend propsBag=bag_pack(propKey, propValue)
| summarize propsMerged = make_bag(propsBag) by Key,tsBin
| order by Key asc,tsBin asc;
This final piece of code is just to make all properties in bins represented again as dynamic data type.
The result depends obviously on two factors:
Result looks like:
Conclusion
It has been demonstrated time window object tracker alike query can be achieve using Kusto language and database on synthetic result. Refactoring to function is easy to do, again, I'm leaving sapce for experimentation.
Benchmarks hasn't been performed, however it would be supr interesting to see how the query (or better function) behaves on large datasets and whether this is scalable approach in order to track individual "objects" state in Kusto.