Step-by-Step Guide to Incrementally Pulling Data from JDBC with Python and PySpark
As data volumes increase and the need for real-time insights becomes more pressing, businesses are turning to incremental data loading to improve their ETL processes. By only extracting the new data since the last data pull, companies can reduce processing times and improve their data accuracy.
In this blog post, we'll walk through a step-by-step guide to incrementally pulling data from a JDBC source using Python and PySpark. We'll also cover how to leverage source table primary key to pull new data from the source table and avoid a full table scan. Additionally, we'll mention how to use the updated_time in combination with primary key to get updates as well.
Video Guides
Step 1 : Set Up the Environment
First Lets Spin up Postgres
docker-compose up --build
This will start Postgres database on your local machine
Step 2: Create a Table and populate the Table with Fake Data
Run Python File
python
python ingest.py
Python File can be found
https://github.com/soumilshah1995/Step-by-Step-Guide-to-Incrementally-Pulling-Data-from-JDBC-with-Python-and-PySpark/blob/main/ingest.py
We have inserted 100 records in Sales tables
领英推荐
Step 3: Running PySpark template which pull Incremental Data
Code Explanation
In summary, this code connects to a PostgreSQL database using the JDBC connector in PySpark, extracts data from a specified table incrementally based on the maximum primary key value from the previous extraction, and writes the new maximum primary key value to a checkpoint file. The main function reads the checkpoint file to get the previous maximum primary key value and constructs an incremental query using this value to extract only the new data from the database. The new maximum primary key value is updated based on the extracted data and written to the checkpoint file for use in the next extraction. Finally, the extracted incremental data is displayed on the console.
Output of running Python File template.py
Output of results
Running Template Again
Running Again ingest.py adding some more data and running template to check if new data was pulled it should pull everything from ID 100
Advantages of Incremental Extraction
There are several advantages to performing incremental extraction, including:
TIP
Combining the primary key (which auto-increments) with the record updated date can be an effective way to identify newly inserted records and updated records in a database. By comparing the maximum primary key and the maximum updated date from the previous extraction with the current database records, it is possible to determine which records have been newly inserted or updated since the last extraction. This approach can be useful for incremental extraction of data from a database using PySpark, as it enables the user to efficiently identify and extract only the newly inserted or updated records.
I'm now writing and publishing a blog post about how you can also get updated Records using updated_at column with PK?
Conclusion,
In conclusion, pulling data from JDBC using Python and PySpark can be a daunting task, especially when dealing with large datasets. However, by following the step-by-step guide outlined in this blog, users can easily and efficiently incrementally pull data from JDBC with minimal effort. By utilizing PySpark's powerful features, such as filtering and aggregation, users can extract only the necessary data and improve performance. Additionally, by incorporating a combination of primary key and last updated date, users can not only pull new data but also updates to existing records. This technique can help users stay up-to-date with their data sources and make informed decisions based on the most current information available. Overall, the ability to incrementally pull data from JDBC with Python and PySpark is an invaluable skill for any data professional, and with the right tools and techniques, it can be a straightforward and efficient process.
Software Engineer | Big Data | Distributed Systems | Machine Learning
1 年Good tutorial Soumil S.