Table Parsing Made Simple with Homegrown Neural Networks - Part 2: Multi-thread Async Preprocessing (Drive Safe and Go Fast)
Image created by DALL.E

Table Parsing Made Simple with Homegrown Neural Networks - Part 2: Multi-thread Async Preprocessing (Drive Safe and Go Fast)


Introduction: Why Preprocessing Matters

As we have already alluded in article 1, turning the raw data into "Machine Learning (ML) ready" is the toughest job in the project.

This was especially true for my dataset (the 1,500 Excel tables China’s National Bureau of Statistics.) The volume of files alone made manual cleanup impractical, but the real hurdle was the format inconsistencies - Microsoft-specific quirks make tables & charts nice & easy on the eyes but are problematic when loading the data programmatically. Working programmatically with Excel files through COM-based APIs (Component Object Model) is painfully slow and prone to crashes.

This is where the preprocessing pipeline became crucial. It required a hybrid approach combining asynchronous workflows for efficient I/O operations and multithreading to parallelize file processing, dramatically improving performance.

In this article, we’ll walk through how the preprocessing pipeline was designed to overcome these challenges, covering:

  • How asynchronous I/O speeds up reading thousands of files concurrently.
  • How we handled complex formatting issues, like merged cells and missing headers.
  • How to handle real-world, machine-unfriendly datasets at scale.

See GitHub Repo for project code.


Key Concepts Behind the Pipeline

  • Concurrency & Parallelism
  • Handle Real-World Data
  • Turbo Charge the Process


Concurrency & Parallelism: Why We Chose a Hybrid Approach

Understanding the difference between asynchronous programming (concurrency) and multithreading (parallelism) is key to optimizing performance:

  • Multithreading (Parallelism): Like a kitchen with multiple chefs, each working on different dishes at the same time by using separate CPU cores.
  • Async Programming (Concurrency): Like a single chef managing several dishes, switching between tasks (e.g., stirring one pot while another simmers), maximizing efficiency without blocking progress.


See Appendix B for more details on concurrency and parallelism.

Why a Hybrid Approach Works

  • Concurrency handles the slow parts—waiting for the system to read/write data.
  • Parallelism handles the computationally heavy parts—processing large tables in memory.

In summary, Concurrency (async I/O): Reads and writes multiple files concurrently to avoid I/O stalls (asyncio). Parallelism (multithreading) performs CPU-heavy table operations in parallel threads to speed up processing (ThreadPoolExecutor). The combination allows the system to handle massive data volumes smoothly and scale across both I/O and CPU constraints.



Handling Messy Real-World Data

The preprocessing pipeline tackles the inconsistent, human-friendly formats often found in Excel files by addressing three key issues: merged cells, dynamically filtering for relevant table files, and failures due to multi-threading and async processing.

Normalizing Table Structure: Unmerge Cells and Empty Cells

  • The Problem: Many tables used merged cells to make headers span multiple columns and/or rows for visual appeal, resulting in empty fields and misaligned data during extraction.
  • The Solution: The preprocessor class Excel_Preprocessor used xlwings's unmerge function to spreads the value across the relevant cells (i.e., if a merged cell is "Province", instead of cell A1 = "Province", B1, A2, B2 are all empty, we have A1, B1, B1, B2 are equal to "province".) During inference later on, when I "flattened" the header rows, I could algorithmically differentiate the which cells were demerged and which were truly "empty" cells.
  • Impact: This creates a structured grid where every cell has data, eliminating column and row misalignment.

Dynamic Table Filtering with Callable Functions

  • The Problem: Hardcoding filtering rules to extract files from source data folders (e.g., "English-only files, excel only files, etc.") made the pipeline inflexible, especially if I need scale this up to include more yearly data, multiple data sources, or have more custom search needs.
  • The Solution: The pipeline uses callable functions for filtering logic. Instead of fixed rules, users pass their own custom functions (e.g., filter by file names or content) to select relevant files dynamically. Example: Filter only tables with names ending in "E" for English tables or apply a custom filter for specific workflows.
  • Impact: This approach makes the pipeline more reusable and adaptable for larger datasets without changing the core code.


"Turbocharging" the Process

To maximize speed, the pipeline:

  • Runs with higher concurrency: Processes dozens of files concurrently by setting a higher number of worker threads.
  • Retries missing files: If files fail due to errors, the system logs them and retries multiple times in subsequent iterations to "exhaust" missing files and ensure nothing is left unprocessed.


In summary, the preprocessing pipeline addresses messy tables by normalizing merged cells, detecting and filling missing headers, and using dynamic, callable-based filtering to handle a variety of datasets. With high concurrency and multiple retries, it ensures efficient, large-scale processing of thousands of files, even in the face of real-world inconsistencies.


Quick Code Walkthrough

Below are key steps with simplified code snippets to help you better understand the flow.

Note that the codes in this section are simplified code for demo purposes only (for the full project code, see GitHub Repo.)


1. Filtering Excel Files (English Only)

Select only relevant .xls and .xlsx files (English versions).

Simplified Demo Code Examples

from pathlib import Path
from typing import List, Callable

def get_filtered_files(source_data_dir: str, filter_criterion: Callable[[str], bool]) -> List[Path]:
    """
    Get Excel files from the directory and filter them dynamically using a callable function.

    Args:
        source_data_dir (str): Path to the directory containing Excel files.
        filter_criterion (Callable[[str], bool]): Function to decide if a file is included (e.g., file name check).

    Returns:
        List[Path]: List of file paths that match the filtering criterion.
    """
    # Convert the directory path to a Path object and list all .xls and .xlsx files
    all_files = list(Path(source_data_dir).glob("*.xls")) + list(Path(source_data_dir).glob("*.xlsx"))
    
    # Apply the filtering criterion (callable) to filter the files dynamically
    filtered_files = [file for file in all_files if filter_criterion(file.stem)]

    # Logging (can be replaced by print for simple debugging)
    print(f"Total files found: {len(all_files)}")
    print(f"Files after filtering: {len(filtered_files)}")

    if not filtered_files:
        print(f"No files matched the filter criterion in {source_data_dir}.")

    return filtered_files

# Example usage:
# Filters for English files by file names starting with "E" or ending with "e"
filter_english_files = lambda name: name.lower().startswith("e") or name.lower().endswith("e")

# Directory containing the Excel files
source_directory = "path/to/excel/files"

# Get filtered files using dynamic filtering
english_files = get_filtered_files(source_directory, filter_english_files)
print("Filtered English files:", english_files[:5])        

  1. File Listing: Lists all .xls and .xlsx files in the source directory.
  2. Dynamic Filtering: Applies the filter_criterion (a function) to each file name.
  3. Example Filter: The filter_english_files lambda selects file names starting or ending with "e".

The callable lets you swap in different filtering logic without changing the core function (e.g., filtering by year, keywords, or format).


2. Processing a Single Excel File with ExcelPreprocessor

This is the core code to make table data "ML model friendly". Specifically, it converts structured Excel tables into vectorized row representations with consistent placeholders and metadata, preparing them for machine learning classification while handling merged cells, blank rows, and inconsistencies.Key methods in the class includes:

  • Excel File Handling: opens an Excel workbook using xlwings (a COM-based library).
  • Merged Cells: Detects and "unmerges" cells, filling each cell in the merged range with the same value.
  • Row Serialization: Converts each row into a structured string format, replacing empty cells with "EMPTY".

Simplified Code with Class-Based Approach

import xlwings as xw

class ExcelPreprocessor:
    def __init__(self):
        self.app = None  # Lazy initialization to avoid unnecessary Excel instances.

    def _initialize_app(self):
        """Initialize the Excel application instance only when needed."""
        if self.app is None:
            self.app = xw.App(visible=False)

    def _close_app(self):
        """Close the Excel application to free up resources."""
        if self.app:
            self.app.quit()
            self.app = None

    def process_excel_full_range(self, file_path, yearbook_source, group):
        """
        Processes the content of an Excel file:
        - Handles merged cells by filling the unmerged area with the same value.
        - Converts rows into structured strings with metadata.
        """
        self._initialize_app()  # Start the Excel instance

        try:
            workbook = self.app.books.open(file_path)
            sheet = workbook.sheets[0]  # Assume the first sheet
            print(f"Processing file: {file_path}")

            data_rows = []
            for row_idx, row in enumerate(sheet.used_range.value):
                # Unmerge and handle missing values
                row_data = ["EMPTY" if cell is None else str(cell) for cell in row]

                # Add metadata for each row
                data_rows.append({
                    "text": ", ".join(row_data),  # Serialize the row into a single string
                    "row_id": row_idx + 1,  # Unique row ID
                    "group": group,  # File group for traceability
                    "yearbook_source": yearbook_source,  # Metadata for source (e.g., "2012" or "2022")
                })

            workbook.close()
            print(f"Finished processing {len(data_rows)} rows from {file_path}.")
            return data_rows

        finally:
            self._close_app()  # Ensure Excel instance is closed

# Example usage
preprocessor = ExcelPreprocessor()
rows = preprocessor.process_excel_full_range("example.xlsx", "2012", "table_group_1")
print(f"Processed {len(rows)} structured rows.")

        

Main methods include the Initialization (__init__ and initializeapp), Opening the Workbook, Processing Each Row (used_range.value), Row Serialization, Handling Merged Cells (via Unmerging), Resource Cleanup (_close_app), so on...

Critical Parts:

  • Excel COM Dependency: Since xlwings uses COM-based APIs, the processing happens within a single thread, and CoInitialize() must be managed explicitly in multi-threaded setups.
  • Merged Cells Handling: Tables often use merged cells for headers or grouped data, which can cause misalignment in CSVs. The preprocessor "flattens" merged cells into consistent row-by-row representations.
  • Metadata for ML Training: By adding group and yearbook_source, each row carries extra context (e.g., table type, source year), which helps during training and inference.

Example Output

[
    {'text': 'Title, EMPTY, EMPTY', 'row_id': 1, 'group': 'table_group_1', 'yearbook_source': '2012'},
    {'text': 'Header1, H2, H3', 'row_id': 2, 'group': 'table_group_1', 'yearbook_source': '2012'},
...
]        

3. Multi-thread + async File Processing: Process multiple files concurrently for faster performance.

Simplified Demo Code Example

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def process_multiple_excel_files_async(files):
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as executor:
        tasks = [loop.run_in_executor(executor, process_excel_full_range, file) for file in files]
        results = await asyncio.gather(*tasks)
    return results

# Example usage
files = ["file1.xlsx", "file2.xlsx", "file3.xlsx"]
processed_results = asyncio.run(process_multiple_excel_files_async(files))
print(f"Processed {len(processed_results)} files concurrently.")

        

  • asyncio.gather() runs multiple file-processing tasks concurrently using a thread pool.
  • process_excel_full_range() is wrapped inside run_in_executor() to offload blocking tasks.


4. Timeout and Error Handling: Ensure that long-running tasks don’t block the pipeline.

Simplified Code Example

from concurrent.futures import TimeoutError

def process_with_timeout(file_path, timeout=600):
    try:
        with ThreadPoolExecutor() as executor:
            future = executor.submit(process_excel_full_range, file_path)
            return future.result(timeout=timeout)  # Raise TimeoutError if it exceeds limit
    except TimeoutError:
        print(f"Timeout exceeded for {file_path}")
        return None        

  • Limits the processing time for each file. If a file takes too long, it is logged and skipped.


5. Save Processed Data to CSV: Store the processed rows into a CSV file.

Simplified Code Example

import pandas as pd

def save_to_csv(data_rows, output_file):
    df = pd.DataFrame(data_rows, columns=["text"])
    df.to_csv(output_file, index=False)

# Example usage
save_to_csv([{"text": row} for row in rows], "output.csv")
print("Data saved to CSV.")        

  • Converts the processed rows into a pandas DataFrame and saves them as a CSV.


6. Retry Missing Files: Use a while loop to retry processing until EITHER almost all files are processed (i.e., missing files less than 15), or the max retry attempts is reached (i.e., attempt 3 times.)

Simplified Code Example: Check for missing file threshold

async def run_preprocessing_pipeline_async():
    """
    Orchestrates the preprocessing of yearbook datasets (2012 and 2022).
    Skips processing if both datasets meet the missing files threshold.
    """
    # Check if both datasets meet the missing files threshold
    if all(
        meet_missing_files_threshold(
            source_data_dir=dir_path,
            processed_data_file=processed_file,
            threshold=15,
            yearbook_source=year,
        )
        for dir_path, processed_file, year in [
            (YEARBOOK_2012_DATA_DIR, PREPROCESSED_2012_DATA_FILE, "2012"),
            (YEARBOOK_2022_DATA_DIR, PREPROCESSED_2022_DATA_FILE, "2022"),
        ]
    ):
        logger.info("Threshold met for both datasets. Skipping preprocessing.")
        return        

  • all() Function: Ensures that preprocessing is skipped only if both datasets meet the threshold.
  • Iterates Over Tuples: Instead of repeating the same function twice, the datasets are passed as tuples of (directory, file, year) for dynamic checks.
  • Early Exit: If both datasets meet the threshold, the pipeline logs a message and exits the pipeline.

Simplified Code Example: Check for number of tries

async def preprocessing_pipeline_async():
    ...
    max_attempts = 3  # Maximum retry attempts
    attempts = 0

    while attempts < max_attempts:
        missing_files = get_missing_files(source_dir, processed_data_file)

        if not missing_files:  # Exit loop if all files are processed
            print("All files processed successfully.")
            break

        print(f"Retry attempt {attempts + 1}: {len(missing_files)} files remaining...")
        await preprocess_missing_files_async(missing_files)  # Retry missing files
        attempts += 1  # Increment retry counter

    if missing_files:
        print(f"{len(missing_files)} files could not be processed after {max_attempts} attempts.")        

  • Retries processing missing files up to 3 times by default and logs files that remain unprocessed.
  • With higher concurrency (e.g., 10+ threads), processing speeds up but increases the chances of file lock errors or timeouts. To balance this, the pipeline compensates with additional retry attempts, reprocessing failed files multiple times to ensure they succeed.


Example

async def run_pipeline():
    files = get_filtered_files("path/to/data", "2012")
    processed_files = await process_multiple_excel_files_async(files)
    save_to_csv([{"text": row} for file_rows in processed_files for row in file_rows], "output.csv")
    print("Pipeline completed.")        

Results & Lessons Learned

  • Processing time: Cut down preprocessing time from more than few hours to under two hours.
  • Processed files: After running the pipeline couple times. I was able to process almost all raw data files (1,475 tables out of 1,482 table files.)
  • Challenges: Memory constraints when processing large batches. Handling edge cases, such as corrupted Excel files or files missing key sections.


Key Lessons Learned

  1. COM Initialization and Resource Leaks: Added pythoncom.CoInitialize() and closeapp() to ensure thread-safe Excel instance handling and prevent memory bloat from lingering COM objects.
  2. File Locking and Timeouts: Used tempfile.NamedTemporaryFile() for intermediate writes, increased timeout to 1000 seconds, and adjusted max_concurrent_tasks to balance speed and prevent file lock errors.
  3. Path and File Filtering Errors: Ensured consistent path comparisons with Path.resolve().samefile() and filtered only .xls/.xlsx files using name.lower().startswith("e") to avoid processing irrelevant files.
  4. Header Mismatches and Data Integrity: Created preprocessed files with expected headers when missing and validated header consistency before merging to avoid runtime errors.
  5. Infinite Loops and Encoding Issues: Added retry counters to limit loop iterations; defaulted to utf-8 encoding but don't assume it - use chardet.detect() to detect encoding type first anyway.


Takeaways:

  • Resource Management: Properly clean up COM instances and temporary files.
  • Thread Safety: Use thread-local resources to avoid file locks and memory bloat.
  • Error Handling and logging: Implement retries and limit iterations to prevent infinite loops; implement "excessive" logging in every step of the way in the async/multi-threading process (many things can go wrong.)
  • File Filtering and Validation: Process only relevant files and always validate headers before operating on them.


Next Steps

The next article will cover building the neural network, training, and inference. Stay tuned.

See GitHub Repo for full project code.


Conclusion

Conclusion: In this article, we explored how the preprocessing pipeline tackles the complexities of messy, real-world Excel tables. By combining asynchronous I/O for concurrency and multithreading for parallelism, the system processes thousands of files concurrently, overcoming bottlenecks typically caused by Microsoft COM-based operations in large-scale data preparation.

This hybrid approach makes the pipeline adaptive and resilient, capable of handling diverse datasets and real-world inconsistencies, resulting in an automated, scalable solution that transforms chaotic data into clean, structured, and vectorized table rows—ready for machine learning tasks.

Questions for You:

"Have you faced similar issues with merged cells?"

"What strategies have you used to process large volumes of inconsistent data? Have you experimented with asynchronous workflows or other optimizations?"


Appendix A: More on Microsoft COM

Microsoft COM (Component Object Model) is a legacy interface that was built in the pre-cloud days and allows software components to interact with each other within Windows applications. It’s commonly used to control programs like Excel and Word programmatically through external scripts or code. For example, VBA (yes, chagrin to IT security professionals today, plenty of people are still using it) relies on COM. It can access advanced Excel features (e.g., unmerging cells), but COM-based operations are "singular (non-parallel) & linear (synchronous)" by design, which makes it slow, resource-intensive, and crashes when it comes to large datasets.


Appendix B: Concurrency & Parallelism

Concurrency and parallelism are often used interchangeably (they both make programs go faster), but it's important to understand the subtleties between the two:

Concurrency (Asynchronous I/O for File Operations)

  • What It Is: Concurrency means managing multiple tasks at the same time by switching between them during "waiting" periods (e.g., while reading or writing a file).
  • What’s Happening: The asyncio library allows the pipeline to read and write multiple Excel files concurrently. When one file read is waiting for disk I/O to complete, asyncio switches to the next file, keeping the system busy.

Reading/writing files is an I/O-bound task (input/output), meaning the bottleneck is the speed of disk operations, not CPU calculations. Concurrency improves throughput by overlapping these waiting times, ensuring multiple file operations are in progress simultaneously.

Parallelism (Multithreading for Table Processing)

  • What It Is: Parallelism means performing multiple tasks at the same time by using separate CPU cores (see note.)
  • What’s Happening: Once a file is loaded, tasks like unmerging cells, filling empty rows, and formatting are handled in parallel using ThreadPoolExecutor from concurrent libraries future method (running processes concurrently is event management; in Python, future usually means future events.) Each thread processes one file at a time running computations independently of the main process.

Concurrency (Asynchronous I/O)

  • The asyncio library is used to read, write, and process Excel files concurrently. When the system reads an Excel file from disk (an I/O-bound task), asyncio switches to another file operation while waiting for the first to complete. This prevents the system from sitting idle while the disk reads or saves data, allowing multiple files to be processed concurrently.
  • Number of Threads: The pipeline typically uses 4 to 10 threads, depending on the number of available CPU cores. Each thread processes one file at a time.
  • Example: While waiting for File A to load into memory, the system starts loading File B. When File A is ready, it resumes processing, while File B might still be reading.

Parallelism (Multithreading for CPU-Bound Tasks)

  • Purpose: Perform computationally expensive operations (like unmerging cells or restructuring rows) on multiple files at the same time by using separate CPU threads.
  • What It’s Doing: The concurrent library (concurrent.futures.ThreadPoolExecutor) is used to create parallel threads that run independently. When an Excel file is loaded into memory, tasks like unmerging cells, filling missing values, and formatting rows are offloaded to separate threads. Each thread processes a different file (or a part of a file) in parallel, taking advantage of multiple CPU cores.
  • Example: While Thread 1 is unmerging and formatting File A, Thread 2 is processing File B. Both threads run at the same time, utilizing different CPU cores to complete their tasks faster.

Is ThreadPoolExecutor True Parallel

There is a subtlety about whether ThreadPoolExecutor is "truly parallel." In the training and inference pipelines (training, inference), I will use multi-processing in GPU operations, which represents "real parallelism" because they "split up" the GPU physically.

Technically speaking, ThreadPoolExecutor behaves more like "virtual multi-threading" for GPU-bound tasks due to Python's GIL (global interpreter lock), which allows only one thread executes Python bytecode at a time, even in a multi-threaded program - it's like having a full kitchen staff but only leaves out only one designated stove.) It has been a constant complaint from more experienced Python developers. According to Python Software Foundation's latest PSF annual general meeting (AGM), we should expect some relaxation on this one in coming releases.)

However, for operations like ours (file I/O and libraries like pandas and openpyxl), the GIL is released after each operation anyway. This allows threads to run almost simultaneously. Therefore, it was very close to true parallel processing.


要查看或添加评论,请登录

Xiao-Fei Zhang的更多文章

社区洞察

其他会员也浏览了