Experimenting with OSDU Community Implementation and Microsoft Fabric
Kadri Umay
OT IT | Energy Data Platform | IASA Distinguished Architect | Carbon Capture and Storage | Ecosystem | Technology Evangelist | Public Speaker | Industry Standards OSDU, OPC, PIDX, Product Developer
In the first two installments of this series of articles, I have shown how an OSDU CI could be stood up in 5 minutes. In this third article, I am going to show the results of my experimentation with connecting OSDU with Microsoft Fabric, which is an integrated data platform that provides a comprehensive set of services for managing, processing, and analyzing data. It includes components for data ingestion, transformation, orchestration, and reporting. My main motivation is to surface OSDU data seamlessly in Microsoft's Artificial Intelligence Stack and Power Platform.
A few years back, I have developed a Power BI connector for OSDU / ADME, which however had one major flaw, which was due to the limitations of the OSDU query API which can only provide metadata, I was not able to access the underlying data, in this experiment I was able to do this very easily. Fabric enables you to connect the deep data oceans on OSDU to the rich productivity applications and tools of Microsoft.
Questions, questions, questions
How can I make my data in OSDU useful for analytics without having to create a copy or ETL/ELT?
How does OSDU store the data, what gets in the files and what gets in the databases?
How can I correlate my uwi with the parquet files that hold the well logs?
How can I preserve data security settings between OSDU and Analytics?
If these are some of the questions you have in mind, keep going.
Experimenting with the OSDU CI
Building on the OSDU CI deployment I've outlined before in the previous articles, I will ingest the TNO dataset to wellbore DDMS using a simple Python application, show you how the different components of the data is stored across the blob folders and databases and how they're correlated, build a Fabric shortcut to Minio using the generic S3 connector, write a very simple PySpark Notebook to extract and ingest the data in OneLake parquet delta tables, build a GraphQL interface and demonstrate a Power BI dashboard that can visualize various plots of the well logs.
While doing this, I had to buy a DNS domain and an SSL certificate. The Fabric Shortcut has to find the endpoint, hence need DNS resolution and the endpoint has to have an SSL certificate as the self-signed one throws an error.
Let's get going.
Ingesting the TNO and getting into the guts of OSDU CI
Getting the TNO dataset ingested into wellbore DDMS was bit of challenge, there are tools and Python scripts in the community GitLab. I have written a simple script to read the Las files
# Specify the directory path
directory = './well-logs/'
# Iterate over each file in the directory
for filename in os.listdir(directory):
# Check if the current item is a file
if os.path.isfile(os.path.join(directory, filename)) and filename.endswith('.las'):
las = lasio.read(os.path.join(directory, filename))
wellbore_id = las.well["WELL"].value.replace("-","") #remove the dashes from the well name, in production this might need to be changed
record = create_record_from_header(data_partition_id, acl_domain, legal_tag, las.well["STRT"].value, las.well["STOP"].value, wellbore_id, las.curves[0].mnemonic, las.keys())
record_id = publish_record(welllog_dms_url, record)
df_from_arr = pd.DataFrame(las.data, columns=las.keys())
data_to_send_parquet = df_from_arr.to_parquet(path=None, engine="pyarrow")
response = publish_parquet_data(welllog_dms_url, record_id, data_to_send_parquet)
and create a simple manifest that ingests the files and creates the parquet
def create_record_from_header(data_partition_id, acl_domain, legal_tag, start, stop, wellbore_id, ref_curve_id, curves):
record = {}
record["kind"] = "osdu:wks:work-product-component--WellLog:1.2.0"
record["acl"] = {}
record["acl"]["viewers"] = [f"data.default.viewers@{data_partition_id}.{acl_domain}"]
record["acl"]["owners"] = [f"data.default.owners@{data_partition_id}.{acl_domain}"]
record["legal"] = {}
record["legal"]["legaltags"] = [f"{legal_tag}"]
record["legal"]["otherRelevantDataCountries"] = ["US"]
record["data"] = {}
record["data"]["ReferenceCurveID"] = ref_curve_id
record["data"]["SamplingStart"] = start
record["data"]["SamplingStop"] = stop
record["data"]["WellboreID"] = f"osdu:wks::master-data--Wellbore:{wellbore_id}:"
record["data"]["Curves"] = []
for each in curves:
record["data"]["Curves"].append({"CurveID": each, "NumberOfColumns": 1})
record["version"] = 0
return(record)
the parquet and records are ingested is using the OSDU Python SDK
def publish_record(welllog_dms_url,record):
response = client.post(welllog_dms_url, json=[record])
print_response(response)
record_id = response.json()["recordIds"][0]
print(record_id)
return record_id
def publish_parquet_data(welllog_dms_url, record_id, data):
headers = { 'content-type': 'application/parquet'}
response = client.post(f'{welllog_dms_url}/{record_id}/data', data=data, headers=headers)
print_response(response)
return response
Where does the data ingested into OSDU goes and how is the internal structure. It goes into 2 main places, to the Postgres tables that has the full manifest and the blob storage in 2 folders where one stores the parquet files and the other stores part of the manifest and the two are correlated via a record id.
There are 2 manifest json's stored for each file, the important thing to know is they only hold the "data" section and the information related with entitlements, document properties, etc... are stripped off. One holds the raw manifest as ingested and the second one has the BulkUri field which is going to be very important for to be able to correlate the header information such as UWI with actual log data in the parquet file.
The parquet files are stored in a separate place in the blob folder structure. Observe the directory structure. Since the well logs are stored are natively stored in parquet, it greatly simplifies our experiment. However, there are similar approaches to extract multi-dimensional data in SegY, ResqML, etc... and store it in series of parquet files.
Making this complex data structure analytics ready
Now that we understand how the data is stored, second step was to create shortcuts to OSDU, since Minio uses S3 API, I was very straightforward to use the generic S3 shortcut connector.
Reading parquet files recursively and storing in a delta table was very simple. A couple of gotcha's here,
1- You have to strip off the urid from the filename, you have to use this in future to join your data with the header information, I have built a very simple udf for this purpose
# UDF to generate record urn from filename
from pyspark.sql.functions import col, udf
@udf
def get_urid (abfss):
return "urn:wdms-1:uuid:" + abfss.strip().split("/")[8]
2- You have to use the schema merge option, as log files have different types of wireline logs and if you don't do this you lose columns depending on the structure of the first file you extract.
3- You can ingest the two manifest files with a single line of code, but you have to remove the data with the empty BulkUri values.
The code to ingest the parquet files is:
directory_path = "/lakehouse/default/Files/wellbore"
from pyspark.sql.functions import input_file_name
df = spark.read.option("recursiveFileLookup", "true").option("pathGlobFilter","*.parquet").option("mergeSchema", "true").parquet("Files/wellbore")
df=df.withColumn("urid", get_urid(input_file_name()))
df.write.format("delta").mode("append").option("mergeSchema", "true").save("Tables/LogCurve2")
You have to define 2 more udf's to extract the wellbore id and wdms from the json documents. By defining udf's one can do much easier with a single line of code.
from pyspark.sql.functions import col, udf
@udf
def get_wellboreid (urn):
return urn.strip().split(':')[len(urn.strip().split(':'))-2]
@udf
def get_wdms (urn):
if urn is not None:
if len(urn.strip().split(':')) == 4:
return urn.strip().split(':')[3]
The code is:
from pyspark.sql.functions import get_json_object
directory_path = "Files/records"
df_json = spark.read.option("recursiveFileLookup", "true").text("Files/records")
df_json=df_json.withColumn("wellboreid", get_wellboreid(get_json_object(col("value"), "$.data.WellboreID")))
df_json=df_json.withColumn("urid", get_json_object(col("value"), "$.data.ExtensionProperties.wdms.bulkURI"))
df_json.write.format("delta").mode("append").option("mergeSchema", "true").save("Tables/WellboreRecords2")
And now you have 2 tables that has all the information you need, the header info is connected to the well logs via the urid. Note that the manifest I have created is absolutely minimum, a more enriched one will give you much richer data.
Microsoft Fabric supports creating GraphQL APIs, I have tested out a simple case by creating a relationship between the WellBoreRecords and LogCurve tables connecting via the urid field. All configuration driven, zero coding experience.
What's next, you can create a semantic data model and start playing around. Here's some examples of displaying the well logs for well.
What about the entitlements
I can hear the question, what about the entitlements, how do we protect the data. And you are totally right, as the title suggests this is only experimentation, for me to have fun on a rainy Seattle weekend. There are multiple ways to skin this cat, one is with zero changes on the OSDU side, the Postgres records have the entitlements in each record, etc... By intaking this data and providing a third join point, we can have all records identified with the entitlements. Access to data could be provided thru views or queries.
"acl": {"owners": ["[email protected]"], "viewers": ["[email protected]"]}
Second option which is easier would be to change the OSDU service to write the full manifest instead of stripping out the data portion, however this might have some downstream implications.
Thirdly, which is probably the most architecturally sound approach is to build a landing zone where the data in OSDU is exposed in an AI ready shape which then can be connected to Microsoft Fabric via shortcuts. In this experiment, I have chosen the Wellbore DDMS as the data is already in parquet format and easy to test. However for seismic data in formats like SegY, Zgy or reservoir data in ResqML which is an HDF5 based format, it has to converted to parquet files. This could be part of the ingestion dags.
Conclusion
I have demonstrated a way where you can experiment with data on OSDU Community Implementation, connecting it to a Microsoft Fabric OneLake. It is not a production ready solution as there are numerous enterprise concerns, such as data security, that has to be addressed.
Keep Calm and be ready for Venus
VP Technology Management
9 个月Thank you Kadri Umay, very interessting and grateful that you share.