Step-by-Step Guide to Incrementally Pulling Data from JDBC with Python and PySpark

Step-by-Step Guide to Incrementally Pulling Data from JDBC with Python and PySpark

As data volumes increase and the need for real-time insights becomes more pressing, businesses are turning to incremental data loading to improve their ETL processes. By only extracting the new data since the last data pull, companies can reduce processing times and improve their data accuracy.

In this blog post, we'll walk through a step-by-step guide to incrementally pulling data from a JDBC source using Python and PySpark. We'll also cover how to leverage source table primary key to pull new data from the source table and avoid a full table scan. Additionally, we'll mention how to use the updated_time in combination with primary key to get updates as well.

Video Guides


Step 1 : Set Up the Environment

First Lets Spin up Postgres

No alt text provided for this image
docker-compose up --build        

This will start Postgres database on your local machine

Step 2: Create a Table and populate the Table with Fake Data

Run Python File

python

python ingest.py        

Python File can be found

https://github.com/soumilshah1995/Step-by-Step-Guide-to-Incrementally-Pulling-Data-from-JDBC-with-Python-and-PySpark/blob/main/ingest.py


We have inserted 100 records in Sales tables

No alt text provided for this image


Step 3: Running PySpark template which pull Incremental Data

No alt text provided for this image

Code Explanation

In summary, this code connects to a PostgreSQL database using the JDBC connector in PySpark, extracts data from a specified table incrementally based on the maximum primary key value from the previous extraction, and writes the new maximum primary key value to a checkpoint file. The main function reads the checkpoint file to get the previous maximum primary key value and constructs an incremental query using this value to extract only the new data from the database. The new maximum primary key value is updated based on the extracted data and written to the checkpoint file for use in the next extraction. Finally, the extracted incremental data is displayed on the console.

Output of running Python File template.py

Output of results

No alt text provided for this image

Running Template Again

No alt text provided for this image

Running Again ingest.py adding some more data and running template to check if new data was pulled it should pull everything from ID 100


No alt text provided for this image


Advantages of Incremental Extraction

There are several advantages to performing incremental extraction, including:

  1. Reduced network traffic: Incremental extraction only retrieves the data that has changed since the last extraction, which reduces the amount of data transferred over the network.
  2. Reduced workload on the database: Incremental extraction reduces the workload on the database by only retrieving the data that has changed.
  3. Faster processing time:


TIP

Combining the primary key (which auto-increments) with the record updated date can be an effective way to identify newly inserted records and updated records in a database. By comparing the maximum primary key and the maximum updated date from the previous extraction with the current database records, it is possible to determine which records have been newly inserted or updated since the last extraction. This approach can be useful for incremental extraction of data from a database using PySpark, as it enables the user to efficiently identify and extract only the newly inserted or updated records.

I'm now writing and publishing a blog post about how you can also get updated Records using updated_at column with PK?

Conclusion,

In conclusion, pulling data from JDBC using Python and PySpark can be a daunting task, especially when dealing with large datasets. However, by following the step-by-step guide outlined in this blog, users can easily and efficiently incrementally pull data from JDBC with minimal effort. By utilizing PySpark's powerful features, such as filtering and aggregation, users can extract only the necessary data and improve performance. Additionally, by incorporating a combination of primary key and last updated date, users can not only pull new data but also updates to existing records. This technique can help users stay up-to-date with their data sources and make informed decisions based on the most current information available. Overall, the ability to incrementally pull data from JDBC with Python and PySpark is an invaluable skill for any data professional, and with the right tools and techniques, it can be a straightforward and efficient process.

Shyam Gurunath ??

Software Engineer | Big Data | Distributed Systems | Machine Learning

1 年

Good tutorial Soumil S.

回复

要查看或添加评论,请登录

Soumil S.的更多文章

社区洞察

其他会员也浏览了