Graph Feature Engineering for Longitudinal Data (aka Time Series)
TL;DR:?Graph feature engineering in Neo4j for longitudinal (aka time-series) data allows path analysis and other analytics that would be much harder to accomplish in a data lake.
In response to my previous blog, someone asked about Neo4j in comparison/contrast to data lakes.?The commenter pointed out that many companies now use data lakes for intermediate data analysis.?This is true.??It is also true that data lakes, and in particular Databricks? is one of the most common sources of data for Neo4j.??
A different prospective customer on a conference call asked about Neo4j graph feature engineering & longitudinal data. I had to look that one up ("longitudinal data")....
Turns out that one of the clearest example of when Neo4j is a fit vs. doing the analysis in a data lake is when one considers longitudinal/time-series data.
...sooooo..... killing two birds with one stone.....
Data Lakes - today's ODS???
But first, when just considering Neo4j vis-a-vis data lakes and role for the respective technologies, if you take a step back in time, consider the classic Bill Inmon (aka the father of data warehousing) classic architecture for data warehouses:
Initially, data lakes started as a place to archive OLTP data to prevent OLTP systems from slowing or becoming unsustainable due to exploding data volumes.?I think this shifted over time – and will likely continue to do so as data lake providers incorporate technologies into their platform.?My personal opinion is that the data lakes of today are arguably filling the role of the ODS – they have become the “central” data amalgamation point and “data hub” for many of the source systems within larger enterprises.??Many of the operations such as data cleansing, standardization, amalgamation and curation are happening in this area.??The core technology du jour in this area seems to be Spark? based systems such as Data Bricks?, etc.??Arguably, if Bill Inmon updated his architecture for the modern technology stack and implementations, it would likely resemble:
Drilling into the technologies a bit closer, you’d probably see some common trends such as:
But does that mean data lake technologies are a replacement for EDW and Data Mart technologies???Hardly.??The biggest problem facing true data lakes is the huge trade-off between storage and query performance.??The three core technologies for this have vastly different capabilities in this area.?First, BLOB based storage is hardly a fast query engine – but definitely can be a repository for just about anything data wise such as emails, voicemails, etc. and other non-structured and semi-structured data - and considering the legal need for retention of these - there is a fit.??The second - at the other end of the scale - column-based storage, can be extremely fast for global aggregations – i.e., KPI performance metrics slicing/dicing - as well as probably the most cost effective way to store structured data and support ad-hoc queries.
The last and most common data lake implementation technologies are typically Apache Spark? based DBMS platforms such as Databricks?.??To understand the limitations of Apache Spark? we need to look under the hood a bit.
Going back ~10 years ago to 2014, SAP’s Sybase IQ? set a Guinness Book of World Records for the largest (then) database of 12 petabytes (PB) of raw data – it was the 3rd record set by Sybase IQ? over the years.?Likely there are others larger now.??However, it was compared to what was then the founding technology underneath data lakes – Hadoop.??The comparison was ugly:
Some may say – that was Hadoop 10 years ago – this is Spark now.??Welllllllllll….maybe.??Yes, at a physical level, today’s 30+ core CPU’s and 20+TB disks would shrink the datacenter footprint to something more affordable as well as actually attainable.??I say "attainable" because one of the early demonstrations for Neo4j Fabric (now Composite Database technology in 5.x) for scaling involved ~1,000 EC2 instances from AWS exhausted all the available instances in that region.
However, at the software layer, if you take Spark apart a bit you will find out that Apache Spark consists of a Spark Core which implements the scheduling, optimizations (and most importantly) the RDD abstraction – but runs on top of HDFS (Hadoop!), S3, RDBMS, Parquet, ElasticSearch, etc.??
At its core, Apache Spark? is a massively parallel processing (MPP) implementation in which queries are coordinated from the driver and distributed to (often) hundreds or thousands of worker nodes (aka Executors).??So, the classic programmatic API such as the classic DataFrame API creates a job that is decomposed into one or more stages that is then subdivided into tasks (units of execution) that are then sent to the worker nodes for execution.??Parallel query processing (sometimes referred as distributed query processing) is an extremely complicated tasks with such notions as eager aggregation, relocatable joins, result merging, etc. – but in the case of Spark, it is notable that the stages are constructed as Directed Acyclic Graphs (DAGs ...?see – we told you – graphs are everywhere!).
But this brings up a very fundamental point – data lakes are extremely good for organizing and storing massive amounts of data as well as serving as a central clearing house for data dissemination .... but querying with any responsiveness requires absolutely massive amounts of brute-force parallelism that simply is not economically viable.???
Longitudinal (aka Time-Series) Data
Now, how does this apply to Data Lakes vs. Neo4j….and longitudinal data?
One of the common use cases we see in Graph DBMS is what some call longitudinal data – others call it time series.??For the sake of sanity, it is simply a series of events that happen in a sequence.?Arguably longitudinal data makes up the vast majority of data volume for a structured database.?Your order entry system???Customers & Products – mere GBs’ – the orders and orderdetails – TB’s!??Same way with health care or anything else.?It is the business events vs. the business entities that drives data volume. In a graph database, these events are typically stored as nodes.??To understand the graph feature engineering aspects of the problem, we need to first understand what the typical use case is vs. RDBMS – not to pick on them – but RDBMS is ubiquitous …and since my background evolved from there, it’s a common point of reference.
In the relational world, “time series” data is typically implemented by adding either system triggered or application modified timestamps to the primary key.??In some cases, this is just a singleton timestamp such as eventTime.??In other cases when there might be overlap or gaps, there is both a startEffective and endEffective timestamps added to the primary key.??As an example of the latter, here in US, auto insurance typically has a startEffective and endEffective policy dates.?A P&L casualty company might be looking to ensure continuous coverage or looking for gaps in coverage.??As an example of the former, a patient care health system might simply have a visitDate.??An RDBMS can typically perform normal aggregation such as min(), max() and average() using what is known as SQL “Window” queries.??Such queries are extremely common to show current trends such as the average stock price for the past hour using 5-minute windows of time.??This was known as Real-Time Analytics and early implementations used optimized engines called “streaming databases” (aka complex event processing databases) such as Corel8?, Aleri?, StreamBase?, GemFire? - and to a lesser extent now, Kafka?. For those types of window queries - these probably are still the best solution.
However, one of the common use cases for longitudinal data for graph databases is path analysis – e.g. patient or customer journey.??Consider the following white-board sketch of a high-level data model for patient data:
Now, at the simplest of levels, whether RDBMS or GDBMS, a simple path from 1 to nth event is a simple query with an order by clause on the event date – ala:
// RDBMS SQL
SELECT event.*
FROM events
WHERE event.customerID = 123
ORDER BY event.eventTime
// Neo4j Cypher
MATCH (e:Event)
WHERE e.customerID=123
RETURN e
ORDER BY e.eventTime
Both work and assuming the proper indexing – work quickly.
A very common situation is when you are looking at the results of a query that returned an event and you want to find the next episode or the previous one.??From a SQL RDBMS point of view, this query looks like the following:
// find next event from current one
SELECT e1.*
FROM events e1
WHERE e1.customerID=123
AND e1.eventTime = (SELECT min(eventTime)?
? ? ? ? ? ? ? ? ? ? ? ? ?FROM events e2?
? ? ? ? ? ? ? ? ? ? ? ? ?WHERE e2.customerID=e1.customerID?
? ? ? ? ? ? ? ? ? ? ? ? ? ?AND e2.eventTime > “2023-06-12T14:35Z”
? ? ? ? ? ? ? ? ? ? )?
Again, assuming proper indices ….it works fairly quickly. One reason it does this is that because of the B-Tree structure that is often used, the DBMS query optimizer simply uses the eventTime (assuming customerID and eventTime are in a composite index which is very likely) that is known and does a quick index step to the next value. This is a common optimization for min()/max() type operations when all the predicates are available in a composite index and the min()/max() are on the last column in the index.
It gets slightly more complicated (but still easily achievable) when you get questions such as “For patient X (who is here today and being treated for Asthma) what was the last 5 visits for this condition and which drugs were prescribed?”
Now this is where it gets a bit fun.??In a typical pseudo code, we would think about:
1.??????First retrieving all the visits prior to today for Asthma
2.??????Sort in descending order and take top 5
3.??????Then join with Prescription table
There are typically several ways to do this – the first attempts likely would involve using a temp table and finally those wanting a single query would end up using a subquery with an IN clause or a derived table syntax.?All doable.?The problem is that the query optimization for min()/max() just was voided as it is unlikely that condition was indexed – or at least in combination with customerID, condition, etc. You could index it ....and that is how most analytical schemas end up with a lot of indices to where index space consumption greatly exceeds the actual data space consumption.
All of this is not difficult – but the SQL starts to get more and more complicated with subqueries, derived tables or whatever….and depending on the query, some of the index optimizations for aggregations are no longer usable.
领英推荐
Neo4j Graph Feature Engineering & Longitudinal Data
Now let’s look at Neo4j.??One of the most common tricks for doing this in Neo4j is to avoid the whole subquery issue by creating a “linked list” of events – using a “NEXT” relationship.??So, for a patient journey example, each individual patient’s events will have a NEXT_EPISODE relationship (or similar name) to the next episode for that same patient.??Typically, in such implementations, a “LAST” or “MOST_CURRENT” relationship is added from the patient to their last visit.
This allows queries such as “show the last 10 visits for this patient” to be expressed as:
MATCH path=(p:Patient)-[:LAST_VISIT]->(e:Episode)
<-[:NEXT_EPISODE*1..10]-(:Episode)
WHERE p.customerID=123
RETURN path
If the database already exists and doesn’t have this relationship – it is easily added by refactoring using cypher similar to:
// refactor our database to add relationship linking patient
// visits in sequence
MATCH (p:Patient)-[:HAS_EPISODES]->(e:Episode)
WITH p, e
ORDER BY p, e.visitDate
WITH p, collect(e) as episodeList
WITH p, episodeList, range(1,size(episodeList)-1) as idxList
UNWIND idx as curIdx
WITH episodeList[curIdx-1] as prevEpisode, episodeList[curIdx] as curEpisode
MERGE (prevEpisode)-[:NEXT_EPISODE]->(curEpisode)
..and done. Then find the first/last and add those relationships if desired. Easy, peasy.
Useful for a doctor – but not to useful for a pharmaceutical company looking at drug efficacy.?Remember our example earlier with Asthma – welllllllll…..one advantage to having relationships as a first class object in Neo4j is we can create multiple relationships – in this case linked lists – without worrying about the impact on the Patient node.??So we could create another relationship NEXT_EPISODE_SAME_DIAGNOSIS or similar name and we could have a query like:
MATCH path=(p:Patient)-[:LAST_VISIT]->(e1:Episode)
<-[:NEXT_EPISODE_SAME_DIAGNOSIS*1..10]-(e2:Episode)
WHERE p.customerID=123
RETURN path
This is actually not that hard to implement – even on an existing database.?You would simply write a query similar to:
// refactor our database to add a separate relationship to create a
// linked list connecting episodes with the same diagnosis for each
// patient
MATCH (p:Patient)-[:HAS_EPISODES]->(e:Episode)-[:DIAGNOSIS]->(c:Condition)
WITH p, c, e
ORDER BY p, c, e.visitDate
WITH p, c, collect(e) as episodeList
WITH p, c, episodeList, range(1,size(episodeList)-1) as idxList
UNWIND idxList as curIdx
WITH episodeList[curIdx-1] as prevEpisode
, episodeList[curIdx] as curEpisode
MERGE (prevEpisode)-[:NEXT_EPISODE_SAME_DIAGNOSIS]->(curEpisode)
Even still, this is not that useful for a pharmaceutical company.??They likely are trying to evaluate not just one patient – but what are the similar paths across all the patients that were diagnosed with the same condition – and in particular – which drugs were prescribed or prescribed in combination.
Now, this is where the power of a graph database really shines – I can create a “NEXT” relationship between Drugs based on patient prescription sequences.???However, given that there are millions of patients, I need to uniquely identify each relationship with the patientID and condition being treated.??In Neo4j 5.7+ you can do this by creating a unique constraint on relationship properties.?Prior to that, you can simply create an index.?So, the cypher for this might be something like:
MATCH (p:Patient)-[:HAS_EPISODES]->(e:Episode)-[:WAS_PRESCRIBED]->(d:Drug)
, (e)-[:DIAGNOSIS]->(c:Condition)
WITH p, c, d, e
ORDER BY p, c, d, e.visitDate
WITH p, d, collect(e) as episodeList
WITH p, c, episodeList, range(1,size(episodeList)-1) as idxList
UNWIND rangeList as curIdx
WITH episodeList[curIdx-1] as prevEpisode
, episodeList[curIdx] as curEpisode
MERGE (prevEpisode)-[:NEXT_EPISODE_SAME_DIAGNOSIS]->(curEpisode)
;
….and more specifically to focus on the drugs prescribed for the conditions/diagnosis, a quick refactoring using graph feature engineering such as:
// add the drug/diagnosis links for each patient into a linked list
MATCH (p:Patient)-[:HAS_ENCOUNTER]->(e:Encounter)-[:HAS_CONDITION]->
(c:Condition)
? ? , (e)-[:HAS_DRUG]->(d:Drug)
WITH p, c, d, e.date as encounterDate
ORDER BY p, c, d, encounterDate
WITH p, c, collect(d) as drugList, count(*) as numEncounters
WHERE numEncounters > 1
WITH p, c, drugList, range(1,size(drugList)-1) as idxList
UNWIND idxList as curIdx
WITH p, c, drugList[curIdx-1] as prevDrug, drugList[curIdx] as nextDrug
MERGE (prevDrug)-[:NEXT_DRUG {patientId: p.id
, diagnosis: c.description}]->(nextDrug)
;
// add a starting link from the patient to the first drug/diagnosis
// linked list
MATCH path=(d1:Drug)-[rel:NEXT_DRUG]->(d2:Drug)
WHERE NOT EXISTS ((:Drug)-[:NEXT_DRUG {patientId: rel.patientId
, diagnosis: rel.diagnosis}]->(d1))
MATCH (p:Patient {id: rel.patientId})
MERGE (p)-[:DRUG_PATH {diagnosis: rel.diagnosis}]->(d1)
;
Now we can run some more interesting queries such as:
MATCH p=(n:Patient)-[dp:DRUG_PATH]->(:Drug)
-[:NEXT_DRUG* {patientId: n.id, diagnosis: dp.diagnosis}]->(:Drug)
WHERE dp.diagnosis STARTS WITH "Alzheimer's disease"
RETURN p LIMIT 100
;?
Which yields a graph such as the following:
Now, any real drug analytics would likely include additional facts such as the time between prescriptions, the dosages, observed side-effects, outcomes, etc.
Using GDS to boil the ocean...
However, the next step might be to aggregate the patients into clusters based on similarities such as comorbidities, genetic traits, age/gender/race demographics, etc.?This involves a second step (or is the third/fourth by now?) of graph feature engineering around patient similarity:
// create a “fake” property for scoring without having a real weight in
// the data
CALL db.createProperty("weight");
// project a (bi-partite) GDS graph with Patients and some interesting
// aspects we think would form communities
CALL gds.graph.project(
? ? ?"patientSimilarity",
? ? ?// Labels
? ? ?["Patient","Condition","Procedure","Drug","City"],
? ? ?// Relationships
? ? ?{ HAS_HISTORY_OF:?
? ? ? ? ? {type: "HAS_HISTORY_OF", orientation: "NATURAL",
? ? ? ? ? ? properties: {
? ? ? ? ? ? ? ?weight: {property: "weight", defaultValue: 1.0}
? ? ? ? ? ? }
? ? ? ? ? },
? ? ? ?HAS_PRESCRIPTION_DRUG:
? ? ? ? ? {type: "HAS_PRESCRIPTION_DRUG", orientation: "NATURAL",
? ? ? ? ? ? properties: {
? ? ? ? ? ? ? ?weight: {property: "weight", defaultValue: 0.5}
? ? ? ? ? ? }
? ? ? ? ? },
? ? ? ?PROCEDURE_HISTORY:
? ? ? ? ? {type: "PROCEDURE_HISTORY", orientation: "NATURAL",
? ? ? ? ? ? properties: {
? ? ? ? ? ? ? ?weight: {property: "weight", defaultValue: 0.7}
? ? ? ? ? ? }
? ? ? ? ? },
? ? ? ?PATIENT_LOCALE:
? ? ? ? ? {type: "PATIENT_LOCALE", orientation: "NATURAL",
? ? ? ? ? ? properties: {
? ? ? ? ? ? ? ?weight: {property: "weight", defaultValue: 0.3}
? ? ? ? ? ? }
? ? ? ? ? }
? ? ?},
? ? ?{readConcurrency: 4}
);
// compute Jaccard similarity between patients to create a mono-partite
// graph for community detection
CALL gds.nodeSimilarity.write(
"patientSimilarity",
{relationshipWeightProperty: "weight"
, writeRelationshipType: "PATIENT_SIMILARITY"
, writeProperty: "jaccardScore"
, similarityCutoff: 0.4999}
)
;
…followed by a bit of community detection and feature engineering of new nodes representing those communities….
// project a new (mono-partite) GDS graph based on patient similarity
// relationships
CALL gds.graph.project(
? ? ?"patientNetwork",
? ? ?// Labels
? ? ?["Patient"],
? ? ?// Relationships
? ? ?{ PATIENT_SIMILARITY:?
? ? ? ? ? {type: "PATIENT_SIMILARITY", orientation: "UNDIRECTED",
? ? ? ? ? ? properties: {
? ? ? ? ? ? ? ?weight: {property: "jaccardScore", defaultValue: 0.1}
? ? ? ? ? ? }
? ? ? ? ? }
? ? ?},
? ? ?{readConcurrency: 4}
);
// Use Louvain algorithm to detect communities
CALL gds.louvain.mutate(
"patientNetwork",
{relationshipWeightProperty: "weight"
, includeIntermediateCommunities: false
, mutateProperty: "louvainCommunity"
}
)
;
// write the community id to the Patient node?
CALL gds.graph.nodeProperties.write(
"patientNetwork"
,["louvainCommunity"]?
,["Patient"]
, {writeConcurrency: 4}
)
;
// create a new community node - this will allow us to track aggregate
// metrics for the community as well as easy visualization via a
// community ID
CALL gds.graph.nodeProperty.stream("patientNetwork"
,"louvainCommunity",["Patient"])
YIELD nodeId, propertyValue
WITH gds.util.asNode(nodeId) as myPatient
, propertyValue as PatientCommunityID
WITH PatientCommunityID, collect(myPatient) as PatientList
, count(*) as numPatients
WHERE numPatients >= 20? // filter out communities < 20
MERGE (pc:PatientCommunity {algo: "Louvain"
, communityID: PatientCommunityID})
WITH pc, PatientList
UNWIND PatientList as curPatient
MERGE (curPatient)-[:IS_MEMBER_OF]->(pc)
;
// add some stats to the network - add the total number of patients
// in the community
MATCH (pc:PatientCommunity)<-[:IS_MEMBER_OF]-(p:Patient)
WITH pc, count(p) as numPatients
SET pc.numPatients=numPatients
;
….and then finally migrating the individual patient drug paths to the community level…
MATCH (pc:PatientCommunity)<-[:IS_MEMBER_OF]-(p:Patient)
-[dp:DRUG_PATH]->(d:Drug)
MERGE (pc)-[:COMMUNITY_DRUG_PATH {diagnosis: dp.diagnosis}]->(d)
;
This cleans things up a bit, so the above query becomes:
MATCH p=(n:PatientCommunity)-[dp:COMMUNITY_DRUG_PATH]->(:Drug)
-[:NEXT_DRUG* {diagnosis: dp.diagnosis}]->(:Drug)
WHERE dp.diagnosis STARTS WITH "Alzheimer's disease"
RETURN p LIMIT 100
;
Which then looks like:
Which is a bit cleaner ….an probably the first step into path similarities or other possible analysis.
Could you do all of this in a data lake????Probably.??I wouldn’t want to be the person writing the queries or maintaining them….??The complexity would depend on the technology being used for the data lake.?
Nor would I want to be the person trying to do the analytics as they likely would not perform spritely to say the least.??One reason is that for longitudinal data where you are performing deep traversals of unknown length, at least in SQL, you would be stitching together 10’s of SQL fragments with UNION ALL ala:
//pseudo code
SQL with 1 self join for 1 hop
UNION ALL
SQL with 2 self joins for 2 hops
UNION ALL
SQL with 3 self joins for 3 hops
UNION ALL
SQL with 4 self joins for 4 hops
UNION ALL
SQL with 5 self joins for 5 hops
…
....or create a join table and do 15+ outer joins. The pic at the top of this article is from a dummy dataset and even there, the patient journey is 50+ levels deep. 50+ UNION ALL's ....or 50+ outer joins. Ugh. Either way....not fast...and not fun.
But with Neo4j....it runs on a laptop. In less than 1 second.
Net, net: longitudinal data is just easier in Neo4j.
Driving transformation, innovation & business growth by bridging the gap between technology and business; combining system & design thinking with cutting-edge technologies; Graphs, AI, GenAI, LLM, ML ??
1 年Ping Erik Dahlberg perhaps some parts are relevant for you
Excellence in Technology Diplomacy
1 年Found this article well written and easy to read.