F1 project - loading to the database
generated using Dall-E

F1 project - loading to the database

After extracting data with Python, the next step is to push that data into a database. Doing so not only helps mimic an enterprise-style deployment, but it also makes your data accessible to end users and ready for further processing with SQL. Before we load anything, however, we need to integrate the data. I’ll walk through the steps I used, splitting them into two parts—just like the previous article—starting with “simple data,” which doesn’t need extra work before loading, and then moving on to “complex data,” which needs a bit more processing first.

One twist this time around is my use of custom modules. These modules allowed me to write reusable code blocks for both the simple and complex data loads.


What Are Modules?

If you’re not familiar, a module in Python is essentially a file containing functions, classes, or variables, which can then be imported into other scripts. It’s a great way to keep your code organized and reusable—just make sure you keep it generic enough that it isn’t locked into handling edge cases too narrowly.


JSON Reader

My first module is a JSON reader, which reads and combines the JSON files generated by the previous process into a pandas DataFrame. The reason I built a dedicated reader was that I needed to flatten my data. The JSON structure from the API was complex, and flattening it in a single place saved me from repeating the same code in multiple scripts.

json_reader module:

import os
import glob
import json
import pandas as pd
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)

def get_files_to_process(file_pattern, exclude_keys):
    """
    Identifies files to process by excluding those matching given keys.
    
    Parameters:
        file_pattern (str): Glob pattern to match files.
        exclude_keys (list): Keys to exclude based on file names.

    Returns:
        list: List of file paths to process.
    """
    files = glob.glob(file_pattern)
    files_to_process = [
        file for file in files
        if int(file.split("_")[-1].split(".")[0]) not in exclude_keys
    ]
    logging.info(f"Files to process: {files_to_process}")
    return files_to_process


def read_and_process_files(files):
    """
    Reads JSON files and returns the raw data in a DataFrame.
    
    Parameters:
        files (list): List of file paths to read.
    
    Returns:
        pd.DataFrame: Raw data in DataFrame format (no parsing or aggregation).
    """
    all_data = []

    for file_path in files:
        logging.info(f"Processing file: {file_path}")
        with open(file_path, "r") as file:
            data_json = json.load(file)

        flattened_data = [record for sublist in data_json for record in sublist]
        data_df = pd.DataFrame(flattened_data)
        all_data.append(data_df)

    if not all_data:
        logging.info("No data to load. Exiting...")
        return pd.DataFrame()  # Return an empty DataFrame if no data

    data_df = pd.concat(all_data, ignore_index=True)
    return data_df
        

Data Loader

The second module handles loading data into the database. It initializes the database connection, then inserts the data in batches into the target table. It also maps each column to the appropriate field, so the data lands where it’s expected.

data_loader module:

import logging
from sqlalchemy import create_engine, text

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class DataLoader:
    def __init__(self, db_connection_string):
        """
        Initialize the DataLoader with a database connection string.

        Args:
            db_connection_string (str): SQLAlchemy connection string to the database.
        """
        self.engine = create_engine(db_connection_string)

    def load_to_db(self, df, table_name, schema_name=None, batch_size=1000):
        """
        Load a DataFrame into a PostgreSQL table in batches.

        Args:
            df (pd.DataFrame): The data to be loaded.
            table_name (str): Name of the target table.
            schema_name (str, optional): Name of the schema. Defaults to None.
            batch_size (int, optional): Number of rows to insert per batch. Defaults to 1000.

        Returns:
            None
        """
        total_rows = len(df)
        if total_rows == 0:
            logging.info("No data to load.")
            return

        # Build the full table name
        full_table_name = f"{schema_name}.{table_name}" if schema_name else table_name

        # Get column names dynamically from the DataFrame
        columns = ", ".join(df.columns)
        placeholders = ", ".join([f":{col}" for col in df.columns])

        # Prepare the insert query
        insert_query = f"""
        INSERT INTO {full_table_name} ({columns})
        VALUES ({placeholders});
        """

        # Log the start of the process
        logging.info(f"Starting to load {total_rows} rows into table {full_table_name} in batches of {batch_size}.")

        with self.engine.connect() as conn:
            # Convert DataFrame to list of dictionaries for batch inserts
            data_dict = df.to_dict(orient='records')
            
            # Insert data in batches
            for i in range(0, total_rows, batch_size):
                batch_data = data_dict[i:i + batch_size]
                conn.execute(text(insert_query), batch_data)
                conn.commit()
                # Log progress
                batch_end = min(i + batch_size, total_rows)
                logging.info(f"Inserted rows {i + 1} to {batch_end}.")

            

        # Log completion
        logging.info(f"Finished loading {total_rows} rows into table {full_table_name}.")
        

Simple Data

Even though the script for simple data has multiple steps, the logic is fairly straightforward:

  1. Activate the database connection.
  2. Read the source files.
  3. If the table doesn’t exist, create it.
  4. Truncate the table (clear existing rows).
  5. Load the new data into the table.

This flow works well for data that doesn’t require extra processing. You simply bring in your flat JSON files, set up the table, and push the records directly to the database.


Complex Data

For larger datasets or data that needs preprocessing, things get more interesting. If you read the previous article, you may recall how big some of these datasets can get, which calls for a more deliberate approach to avoid overly long queries and processing times.

Here’s the general process I use:

  1. Activate the database connection.
  2. Figure out which files have already been loaded, using the session_key (since there’s one JSON file per session).
  3. Use a custom module to compare loaded sessions against the list of files, so you only load what’s new.
  4. Read and process only the files needed. (In practice, this is typically one file at a time, thanks to earlier data extraction steps.)
  5. Optionally transform or aggregate data. (In many cases this step isn’t strictly necessary, but I keep the door open for added flexibility.)
  6. Load the processed data to the database.
  7. Loop through all relevant files until done.

Breaking data into manageable chunks not only protects the database (and your own machine) from heavy loads, but also simplifies error handling.


Improvement Ideas

There are plenty of ways to refine this workflow:

  1. Table Creation Logic for Complex Data: One potential enhancement would be automatically checking if the table exists before creating it. In my case, I only create the table once when I first run the script, so I haven’t bothered to add that logic.
  2. Database Connection Security: For personal projects on my own machine, I just store credentials in the script. In a more enterprise setting, though, you’d typically use a DSN (Data Source Name) or another secure method to manage credentials and connections. That’s especially helpful if you’re using Windows 11, because it can be tricky dealing with 64-bit vs. 32-bit ODBC drivers.


Conclusion

Overall, loading data into a database can be pretty straightforward once you split the work into logical steps and modules. Keeping your code in separate scripts also allows you to run only the parts you need, which can save time and effort. Of course, if your script is rock-solid, you could merge it with the API-querying process for a single, streamlined workflow.


Previous Articles

Feel free to let me know how you’d approach this! If you have any suggestions or improvements, I’d love to hear them.

要查看或添加评论,请登录

Arthur Ladwein的更多文章

  • F1 project - querying APIs using python

    F1 project - querying APIs using python

    In my previous article, I talked about the overview of my project and the technology choices I made, this article will…

  • F1 project - data analysis overview

    F1 project - data analysis overview

    For those who know me quite a bit, you might also know that I am a Formula 1 fan ! Few sports in the world combine…

    2 条评论
  • Tableau : How to create highlight in viz in tooltips

    Tableau : How to create highlight in viz in tooltips

    I recently built a Tableau dashboard using Formula 1 data. My goal was to show a high-level overview of driver…

  • Secret Santa with Alteryx

    Secret Santa with Alteryx

    Secret Santa is something organized among friends or even some companies, it’s something nice and quite easy to…

    2 条评论

社区洞察

其他会员也浏览了