MI - ETLx: Incremental Extract and Load Module for Python
Paschal Chukwuemeka Amah
Chief Technology Officer @ Wowzi | Lead Technology + Software + Product + Data Engineering Strategy and Management | Serverless Microservices | Event-Driven Architecture | Automation Junkie
Overview
MI-ETLx (Mongo Incremental Extract Transform and Load) is a Python module designed for both Extract, Load, Transform (ELT) and Extract, Transform, Load (ETL) operations, indicated by the 'x' in its name. This flexibility allows users to implement the module according to their specific data processing needs. It provides a smart and efficient way to manage data pipelines, addressing the inefficiencies associated with traditional full-load pipelines.
Two of the prime aims of mi-etl, apart from incremental extraction and loading, is on one hand, to give power back to the data engineer who is willing to continue playing as the software engineer working primarily on data solutions. On the other hand, to take back cost and tool look-in that has become commonplace in data engineering. The heavy tooling has made engineers no longer capable or willing to build pipelines but rather plumb their way through with the amalgamation of several tools, dragging in gigantic cost and feature lock-in around what the tools provide.
Designed to work with AWS first, the module essentially implements 3 steps:
1. Extracts, and straightens out the data where applicable (nested), from a MongoDB database.
2. Loads into a datalake for when the pipeline is conceived to also serve as a source of slowing changing dimension (SCD) data. This is an optional utility and comes in disabled by default.
3. Load extracted data into a warehouse in an update. This means that it caters to changes as well as entirely new data inserts.
Transformation is minimal and open to the data engineer to adapt to any upstream or downstream data reality in their organization.
Features
Change Data Capture Utilization
MI-ETLx initially leverages MongoDB's change data capture (CDC) feature, specifically the oplog, to move only the data that has changed since the last run. This approach minimizes data transfer, resulting in a significantly reduced payload.
Controlled Payloads
MI-ETLx ensures fully controlled payloads, guaranteeing predictability and manageability of time, compute costs, and other metrics. This feature simplifies budgeting for data pipeline operations, making the process more straightforward and efficient.
Unlike traditional full-load pipelines that handle multiple gigabits of data in each run, MI-ETLx has the potential to reduce the load to less than 5MB per run. This substantial decrease in data load size enhances overall efficiency.
Schema Resolution
MI-ETLx discovers schema on the destination where a destination table exists, and adapts the incoming payload to either or both of two considerations: a. dropping and reporting columns from the source that don't exist, b. creating nulling out columns from the exist on the destination but missing from the source.
Extendable to Various Databases
While initially focused on MongoDB, MI-ETLx is designed to be extendable to cover the most popular NoSQL as well as SQL databases. Future releases will include support for loading data into other SQL databases and big data stores, starting with those offered by the biggest three cloud players: AWS, GCP, Azure.
领英推荐
Event-Driven Serverless Service
MI-ETLx is conceived to be run as an event-driven serverless service. This architectural choice enhances scalability, cost-effectiveness, and adaptability to varying workloads, making it an ideal solution for modern, dynamic data processing environments.
Cost Leadership
With drastically reduced payloads and very suited as a serverless service, it means that cost can be very transparent. It will easily leverage the massive free tier offerings by top cloud providers.
Empowering Data Engineering Teams
MI-ETLx places control back into the hands of data engineering teams, allowing them to manage their data pipelines more effectively and with greater precision. Being a python module, engineers can extend it themselves without waiting for the official coverage of other sources and destination systems. Deployed as a serverless solution, the engineer minds his logic and leaves infrastructure and scaling concerns to the cloud provider. This empowerment leads to improved efficiency and better resource utilization.
Usage Examples
Example 1
Set extract_all to an empty list, indicating that data should be extracted from all collections within the specified database. Since we are not writing any SCD copy, datalake in the initialization of the Loader is set to False and aws is empty.
from MI_ETL.Connector import Source
from MI_ETL.data_extraction import DataExtraction
from MI_ETL.loader import Loader
import os
host = os.getenv('oplog_test_host')
user = os.getenv('oplog_test_user')
password = os.getenv('oplog_test_password')
db = os.getenv('oplog_test_db')
required_params = {"host":host, "user":user, "password": password, "db":db}
if all(required_params.values()):
conn = Source.mongo(os.getenv('oplog_test_source_url'))
data_extraction = DataExtraction(connection=conn, extract_all=[], db='sample_analytics')
extracted_data = data_extraction.extract_oplog_data()
#Initiate loader
# The MongoDB connection is also used to update the time metadata for the next run,
# ensuring that each run's timing information is accurately recorded.
loader = Loader(mongo_conn=conn, data=extracted_data, datalake=False, datawarehouse=True, aws={})
# Provide connection to datawarehouse. NOTE !! only redshift and respective postgres dbs
# are supported as at this release
# Result holds meta data information about the load process highlighting if it passed or fail,
# schema information , e.t.c
result = loader.run(host=host, user=user, password=password, db=db, port=5432)
else:
for key, val in required_params.items():
if not val:
print(f"'{key}' is needed for the destination database connection")
Example 2
Set extract_all to an empty list, indicating that data should be extracted from all collections within the specified database. Since we are not writing any SCD copy, datalake in the initialization of the Loader is set to False and aws is empty.
### python
from MI_ETL.Connector import Source
from MI_ETL.data_extraction import DataExtraction
from MI_ETL.loader import Loader
import os
host = os.getenv('oplog_test_host')
user = os.getenv('oplog_test_user')
password = os.getenv('oplog_test_password')
db = os.getenv('oplog_test_db')
required_params = {"host":host, "user":user, "password": password, "db":db}
if all(required_params.values()):
conn = Source.mongo(os.getenv('oplog_test_source_url'))
# Initialize data extraction from 'collection_1' and 'collection_2' in 'sample_analytics',
# extracting from data modified after '2023/12/28' (backfill date).
data_extraction = DataExtraction(connection=conn, extract_all=['collection_1', 'collection_2'], db='sample_analytics', backfill='2023/12/28')
extracted_data = data_extraction.extract_oplog_data()
#Initiate loader
# The MongoDB connection is also used to update the time metadata for the next run,
# ensuring that each run's timing information is accurately recorded.
loader = Loader(mongo_conn=conn, data=extracted_data, datalake=False, datawarehouse=True, aws={})
# Provide connection to datawarehouse. NOTE !! only redshift and respective postgres dbs
# are supported as at this release
# Result holds meta data information about the load process highlighting if it passed or fail,
# schema information , e.t.c
result = loader.run(host=host, user=user, password=password, db=db, port=5432)
else:
for key, val in required_params.items():
if not val:
print(f"'{key}' is needed for the destination database connection")
Example 3
from MI_ETL.Connector import Source
from MI_ETL.data_extraction import DataExtraction
from MI_ETL.loader import Loader
import os
host = os.getenv('oplog_test_host')
user = os.getenv('oplog_test_user')
password = os.getenv('oplog_test_password')
db = os.getenv('oplog_test_db')
required_params = {"host":host, "user":user, "password": password, "db":db}
if all(required_params.values()):
conn = Source.mongo(os.getenv('oplog_test_source_url'))
# Initialize data extraction from 'collection_1' and 'collection_2' in 'sample_analytics',
# extracting from data modified after '2023/12/28' (backfill date).
data_extraction = DataExtraction(connection=conn, extract_all=['collection_1', 'collection_2'], db='sample_analytics', backfill='2023/12/28')
extracted_data = data_extraction.extract_oplog_data()
#Initiate loader
# The MongoDB connection is also used to update the time metadata for the next run,
# ensuring that each run's timing information is accurately recorded.
loader = Loader(mongo_conn=conn, data=extracted_data, datalake=False, datawarehouse=True, aws={})
# Provide connection to datawarehouse. NOTE !! only redshift and respective postgres dbs
# are supported as at this release
# Result holds meta data information about the load process highlighting if it passed or fail,
# schema information , e.t.c
result = loader.run(host=host, user=user, password=password, db=db, port=5432)
else:
for key, val in required_params.items():
if not val:
print(f"'{key}' is needed for the destination database connection")
Note that for each example, the MongoDB connection is also used. This is in order to update the time metadata for the next run, ensuring that each run's timing information is accurately recorded.
These examples demonstrate the flexibility and efficiency of MI-ETLx in extracting and loading data incrementally while providing control and predictability over the ETL/ELT process.?
Expansion plans include broadening source coverage and support for various other databases, making it a versatile solution for modern data processing challenges.?
Running as an event-driven serverless service adds scalability and cost-effectiveness to its list of benefits.
Engineering working on this module are Paschal Chukwuemeka Amah , Nelson Ogbeide and Joseph Ojo . We are looking forward to input from engineers who share our vision, to consolidate and take this mainstream. Looking out for the fun and pure utility ahead.
Lead, Data and Applications at Waje Smart Solutions Limited
8 个月This is super interesting.. . Kudos guys
Senior Data Analyst | Business Process Automation Expert | Provide CEOs, Business Managers and Owners with strategies for Unlocking Business Potential through Data Analysis and Process Automation
8 个月Great
Assistant Manager, Data and Analytics at KPMG UK
8 个月This is really brilliant!
Senior Data Engineer | Expert in Scalable Data Pipelines and Real-Time Analytics
8 个月This is great work!! I just wanted to ask, is there a specific reason you used oplog tailing for CDC instead of the change streams API?