F1 project - loading to the database
Arthur Ladwein
Alteryx ACE since 2019 | Freelance data | Data Analysis, Reporting, Strategic Customer Support
After extracting data with Python, the next step is to push that data into a database. Doing so not only helps mimic an enterprise-style deployment, but it also makes your data accessible to end users and ready for further processing with SQL. Before we load anything, however, we need to integrate the data. I’ll walk through the steps I used, splitting them into two parts—just like the previous article—starting with “simple data,” which doesn’t need extra work before loading, and then moving on to “complex data,” which needs a bit more processing first.
One twist this time around is my use of custom modules. These modules allowed me to write reusable code blocks for both the simple and complex data loads.
What Are Modules?
If you’re not familiar, a module in Python is essentially a file containing functions, classes, or variables, which can then be imported into other scripts. It’s a great way to keep your code organized and reusable—just make sure you keep it generic enough that it isn’t locked into handling edge cases too narrowly.
JSON Reader
My first module is a JSON reader, which reads and combines the JSON files generated by the previous process into a pandas DataFrame. The reason I built a dedicated reader was that I needed to flatten my data. The JSON structure from the API was complex, and flattening it in a single place saved me from repeating the same code in multiple scripts.
json_reader module:
import os
import glob
import json
import pandas as pd
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
def get_files_to_process(file_pattern, exclude_keys):
"""
Identifies files to process by excluding those matching given keys.
Parameters:
file_pattern (str): Glob pattern to match files.
exclude_keys (list): Keys to exclude based on file names.
Returns:
list: List of file paths to process.
"""
files = glob.glob(file_pattern)
files_to_process = [
file for file in files
if int(file.split("_")[-1].split(".")[0]) not in exclude_keys
]
logging.info(f"Files to process: {files_to_process}")
return files_to_process
def read_and_process_files(files):
"""
Reads JSON files and returns the raw data in a DataFrame.
Parameters:
files (list): List of file paths to read.
Returns:
pd.DataFrame: Raw data in DataFrame format (no parsing or aggregation).
"""
all_data = []
for file_path in files:
logging.info(f"Processing file: {file_path}")
with open(file_path, "r") as file:
data_json = json.load(file)
flattened_data = [record for sublist in data_json for record in sublist]
data_df = pd.DataFrame(flattened_data)
all_data.append(data_df)
if not all_data:
logging.info("No data to load. Exiting...")
return pd.DataFrame() # Return an empty DataFrame if no data
data_df = pd.concat(all_data, ignore_index=True)
return data_df
Data Loader
The second module handles loading data into the database. It initializes the database connection, then inserts the data in batches into the target table. It also maps each column to the appropriate field, so the data lands where it’s expected.
data_loader module:
import logging
from sqlalchemy import create_engine, text
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class DataLoader:
def __init__(self, db_connection_string):
"""
Initialize the DataLoader with a database connection string.
Args:
db_connection_string (str): SQLAlchemy connection string to the database.
"""
self.engine = create_engine(db_connection_string)
def load_to_db(self, df, table_name, schema_name=None, batch_size=1000):
"""
Load a DataFrame into a PostgreSQL table in batches.
Args:
df (pd.DataFrame): The data to be loaded.
table_name (str): Name of the target table.
schema_name (str, optional): Name of the schema. Defaults to None.
batch_size (int, optional): Number of rows to insert per batch. Defaults to 1000.
Returns:
None
"""
total_rows = len(df)
if total_rows == 0:
logging.info("No data to load.")
return
# Build the full table name
full_table_name = f"{schema_name}.{table_name}" if schema_name else table_name
# Get column names dynamically from the DataFrame
columns = ", ".join(df.columns)
placeholders = ", ".join([f":{col}" for col in df.columns])
# Prepare the insert query
insert_query = f"""
INSERT INTO {full_table_name} ({columns})
VALUES ({placeholders});
"""
# Log the start of the process
logging.info(f"Starting to load {total_rows} rows into table {full_table_name} in batches of {batch_size}.")
with self.engine.connect() as conn:
# Convert DataFrame to list of dictionaries for batch inserts
data_dict = df.to_dict(orient='records')
# Insert data in batches
for i in range(0, total_rows, batch_size):
batch_data = data_dict[i:i + batch_size]
conn.execute(text(insert_query), batch_data)
conn.commit()
# Log progress
batch_end = min(i + batch_size, total_rows)
logging.info(f"Inserted rows {i + 1} to {batch_end}.")
# Log completion
logging.info(f"Finished loading {total_rows} rows into table {full_table_name}.")
Simple Data
Even though the script for simple data has multiple steps, the logic is fairly straightforward:
领英推荐
This flow works well for data that doesn’t require extra processing. You simply bring in your flat JSON files, set up the table, and push the records directly to the database.
Complex Data
For larger datasets or data that needs preprocessing, things get more interesting. If you read the previous article, you may recall how big some of these datasets can get, which calls for a more deliberate approach to avoid overly long queries and processing times.
Here’s the general process I use:
Breaking data into manageable chunks not only protects the database (and your own machine) from heavy loads, but also simplifies error handling.
Improvement Ideas
There are plenty of ways to refine this workflow:
Conclusion
Overall, loading data into a database can be pretty straightforward once you split the work into logical steps and modules. Keeping your code in separate scripts also allows you to run only the parts you need, which can save time and effort. Of course, if your script is rock-solid, you could merge it with the API-querying process for a single, streamlined workflow.
Previous Articles
Feel free to let me know how you’d approach this! If you have any suggestions or improvements, I’d love to hear them.