Unlocking Incremental Data in PySpark: Extracting from JDBC Sources without Debezium or AWS DMS with CDC
Unlocking Incremental Data in PySpark: Extracting from JDBC Sources without Debezium or AWS DMS with CDC
Video Based Tutorials
Authors?
Soumil Nitin ShahI earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as Lead DataEngineer where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
Divyansh Patel
I'm a highly skilled and motivated professional with a Master's degree in Computer Science and extensive experience in Data Engineering and AWS Cloud Engineering. I'm currently working with the renowned industry expert Soumil Shah and thrive on tackling complex problems and delivering innovative solutions. My passion for problem-solving and commitment to excellence enable me to make a positive impact on any project or team I work with. I look forward to connecting and collaborating with like-minded professionals
Introduction?
Data is the lifeblood of any organization, and the ability to extract and process it efficiently is crucial for making informed business decisions. But extracting data from databases can be a time-consuming and resource-intensive task, especially when dealing with large datasets. Fortunately, PySpark provides a powerful toolset for extracting, processing, and analyzing data efficiently, making it an ideal choice for many data extraction tasks.
In this article, we'll explore how PySpark can be used to extract incremental data from JDBC sources without the need for Debezium or AWS DMS. We'll discuss the advantages of using primary keys (PK) and updated_at columns to extract updated and newly inserted data, and how this approach can be used to pull data from any source database using JDBC.
What is Incremental Data Processing?
Incremental data processing is a technique for processing only the data that has changed since the last time it was processed, rather than processing the entire dataset every time. This approach can significantly reduce processing time and resource usage, especially when dealing with large datasets.
The primary advantages of incremental data processing are efficiency and cost-effectiveness. By processing only the changed data, you can save time and resources, and reduce the amount of data that needs to be stored and processed. This approach can be particularly beneficial when dealing with large datasets that are frequently updated, such as social media feeds, financial transactions, or sensor data.
Hands on Labs
Step 1: Spin up Postgres Database using Docker Compose?
docker-compose up --build
This will start Postgres database on your local machine
Step 2:?Create a Table and populate the Table with Fake Data
Run Python File
python ingest.py
Python file can be found
https://github.com/soumilshah1995/Unlocking-Incremental-Data-in-PySpark-Extracting-from-JDBC-Sources-without-Debezium-or-AWS-DMS-with/blob/main/ingest.py
Python Script creates a table called sales in public schema
Now we will create a trigger which mean automatically when a record is updated column updated_at will automatically update as well?
Creating Trigger
This code is creating a PostgreSQL function and trigger that updates the "updated_at" column of a table called "sales" every time a row is updated.
The function "update_sales_updated_at()" takes no arguments and returns a "TRIGGER" object. The function sets the "updated_at" column of the "NEW" row to the current timestamp using the "CURRENT_TIMESTAMP" function and returns the "NEW" row.
领英推荐
The trigger "update_sales_updated_at_trigger" is created using the "CREATE TRIGGER" statement. The trigger is set to execute the "update_sales_updated_at()" function before every update on the "public.sales" table for each row being updated
We have inserted 100 records in Sales tables
Step 3: Running PySpark template which pull Incremental Data
I will explain entire code logic at end in detailed manner
Now if i run template again i expect no data to be returned?
Now lets Update a record and see if template can record it?
Lets Run the template again to see if we can capture this new changes
Deep Dive into code and Logic?
We define the imports
We declare the settings
Code Logic :
If a checkpoint does not exist, the script will assume that the user is running the template for the first time and will pull all data at once. Moving forward, I want to pull incremental data. We load the most recent maximum id and updated date into variables, and if checkpoints exist, we set first_time_read to False, indicating that checkpoints exist.?
This are two helper class which are Holds paramaters such as max ID and updated date and other process information into flags as shown in figure
Main Logic which was explained in flow charts
Main Logic
This technology, along with its templates, has the ability to recognize new inserts and updates while incrementally retrieving data. It should be noted, however, that deletes are not supported through this method. If deleting capture is necessary, utilizing DMS or Debezium is recommended. However, if deletion capture is not a requirement, this option can be a cost-effective and faster alternative to performing full table scans
Conclusion
In conclusion, pulling data from JDBC using Python and PySpark can be a daunting task, especially when dealing with large datasets. However, by following the step-by-step guide outlined in this blog, users can easily and efficiently incrementally pull data from JDBC with minimal effort. By utilizing PySpark's powerful features, such as filtering and aggregation, users can extract only the necessary data and improve performance. Additionally, by incorporating a combination of primary key and last updated date, users can not only pull new data but also updates to existing records. This technique can help users stay up-to-date with their data sources and make informed decisions based on the most current information available. Overall, the ability to incrementally pull data from JDBC with Python and PySpark is an invaluable skill for any data professional, and with the right tools and techniques, it can be a straightforward and efficient process.
Data Engineer at Ancud IT with expertise in data pipelines.
1 年I appreciate this article and encourage you to continue your excellent efforts Soumil S.