Table Parsing Made Simple with Homegrown Neural Networks - Part 2: Multi-thread Async Preprocessing (Drive Safe and Go Fast)
Introduction: Why Preprocessing Matters
As we have already alluded in article 1, turning the raw data into "Machine Learning (ML) ready" is the toughest job in the project.
This was especially true for my dataset (the 1,500 Excel tables China’s National Bureau of Statistics.) The volume of files alone made manual cleanup impractical, but the real hurdle was the format inconsistencies - Microsoft-specific quirks make tables & charts nice & easy on the eyes but are problematic when loading the data programmatically. Working programmatically with Excel files through COM-based APIs (Component Object Model) is painfully slow and prone to crashes.
This is where the preprocessing pipeline became crucial. It required a hybrid approach combining asynchronous workflows for efficient I/O operations and multithreading to parallelize file processing, dramatically improving performance.
In this article, we’ll walk through how the preprocessing pipeline was designed to overcome these challenges, covering:
See GitHub Repo for project code.
Key Concepts Behind the Pipeline
Concurrency & Parallelism: Why We Chose a Hybrid Approach
Understanding the difference between asynchronous programming (concurrency) and multithreading (parallelism) is key to optimizing performance:
See Appendix B for more details on concurrency and parallelism.
Why a Hybrid Approach Works
In summary, Concurrency (async I/O): Reads and writes multiple files concurrently to avoid I/O stalls (asyncio). Parallelism (multithreading) performs CPU-heavy table operations in parallel threads to speed up processing (ThreadPoolExecutor). The combination allows the system to handle massive data volumes smoothly and scale across both I/O and CPU constraints.
Handling Messy Real-World Data
The preprocessing pipeline tackles the inconsistent, human-friendly formats often found in Excel files by addressing three key issues: merged cells, dynamically filtering for relevant table files, and failures due to multi-threading and async processing.
Normalizing Table Structure: Unmerge Cells and Empty Cells
Dynamic Table Filtering with Callable Functions
"Turbocharging" the Process
To maximize speed, the pipeline:
In summary, the preprocessing pipeline addresses messy tables by normalizing merged cells, detecting and filling missing headers, and using dynamic, callable-based filtering to handle a variety of datasets. With high concurrency and multiple retries, it ensures efficient, large-scale processing of thousands of files, even in the face of real-world inconsistencies.
Quick Code Walkthrough
Below are key steps with simplified code snippets to help you better understand the flow.
Note that the codes in this section are simplified code for demo purposes only (for the full project code, see GitHub Repo.)
1. Filtering Excel Files (English Only)
Select only relevant .xls and .xlsx files (English versions).
Simplified Demo Code Examples
from pathlib import Path
from typing import List, Callable
def get_filtered_files(source_data_dir: str, filter_criterion: Callable[[str], bool]) -> List[Path]:
"""
Get Excel files from the directory and filter them dynamically using a callable function.
Args:
source_data_dir (str): Path to the directory containing Excel files.
filter_criterion (Callable[[str], bool]): Function to decide if a file is included (e.g., file name check).
Returns:
List[Path]: List of file paths that match the filtering criterion.
"""
# Convert the directory path to a Path object and list all .xls and .xlsx files
all_files = list(Path(source_data_dir).glob("*.xls")) + list(Path(source_data_dir).glob("*.xlsx"))
# Apply the filtering criterion (callable) to filter the files dynamically
filtered_files = [file for file in all_files if filter_criterion(file.stem)]
# Logging (can be replaced by print for simple debugging)
print(f"Total files found: {len(all_files)}")
print(f"Files after filtering: {len(filtered_files)}")
if not filtered_files:
print(f"No files matched the filter criterion in {source_data_dir}.")
return filtered_files
# Example usage:
# Filters for English files by file names starting with "E" or ending with "e"
filter_english_files = lambda name: name.lower().startswith("e") or name.lower().endswith("e")
# Directory containing the Excel files
source_directory = "path/to/excel/files"
# Get filtered files using dynamic filtering
english_files = get_filtered_files(source_directory, filter_english_files)
print("Filtered English files:", english_files[:5])
The callable lets you swap in different filtering logic without changing the core function (e.g., filtering by year, keywords, or format).
2. Processing a Single Excel File with ExcelPreprocessor
This is the core code to make table data "ML model friendly". Specifically, it converts structured Excel tables into vectorized row representations with consistent placeholders and metadata, preparing them for machine learning classification while handling merged cells, blank rows, and inconsistencies.Key methods in the class includes:
Simplified Code with Class-Based Approach
import xlwings as xw
class ExcelPreprocessor:
def __init__(self):
self.app = None # Lazy initialization to avoid unnecessary Excel instances.
def _initialize_app(self):
"""Initialize the Excel application instance only when needed."""
if self.app is None:
self.app = xw.App(visible=False)
def _close_app(self):
"""Close the Excel application to free up resources."""
if self.app:
self.app.quit()
self.app = None
def process_excel_full_range(self, file_path, yearbook_source, group):
"""
Processes the content of an Excel file:
- Handles merged cells by filling the unmerged area with the same value.
- Converts rows into structured strings with metadata.
"""
self._initialize_app() # Start the Excel instance
try:
workbook = self.app.books.open(file_path)
sheet = workbook.sheets[0] # Assume the first sheet
print(f"Processing file: {file_path}")
data_rows = []
for row_idx, row in enumerate(sheet.used_range.value):
# Unmerge and handle missing values
row_data = ["EMPTY" if cell is None else str(cell) for cell in row]
# Add metadata for each row
data_rows.append({
"text": ", ".join(row_data), # Serialize the row into a single string
"row_id": row_idx + 1, # Unique row ID
"group": group, # File group for traceability
"yearbook_source": yearbook_source, # Metadata for source (e.g., "2012" or "2022")
})
workbook.close()
print(f"Finished processing {len(data_rows)} rows from {file_path}.")
return data_rows
finally:
self._close_app() # Ensure Excel instance is closed
# Example usage
preprocessor = ExcelPreprocessor()
rows = preprocessor.process_excel_full_range("example.xlsx", "2012", "table_group_1")
print(f"Processed {len(rows)} structured rows.")
Main methods include the Initialization (__init__ and initializeapp), Opening the Workbook, Processing Each Row (used_range.value), Row Serialization, Handling Merged Cells (via Unmerging), Resource Cleanup (_close_app), so on...
Critical Parts:
Example Output
[
{'text': 'Title, EMPTY, EMPTY', 'row_id': 1, 'group': 'table_group_1', 'yearbook_source': '2012'},
{'text': 'Header1, H2, H3', 'row_id': 2, 'group': 'table_group_1', 'yearbook_source': '2012'},
...
]
3. Multi-thread + async File Processing: Process multiple files concurrently for faster performance.
Simplified Demo Code Example
领英推荐
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def process_multiple_excel_files_async(files):
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
tasks = [loop.run_in_executor(executor, process_excel_full_range, file) for file in files]
results = await asyncio.gather(*tasks)
return results
# Example usage
files = ["file1.xlsx", "file2.xlsx", "file3.xlsx"]
processed_results = asyncio.run(process_multiple_excel_files_async(files))
print(f"Processed {len(processed_results)} files concurrently.")
4. Timeout and Error Handling: Ensure that long-running tasks don’t block the pipeline.
Simplified Code Example
from concurrent.futures import TimeoutError
def process_with_timeout(file_path, timeout=600):
try:
with ThreadPoolExecutor() as executor:
future = executor.submit(process_excel_full_range, file_path)
return future.result(timeout=timeout) # Raise TimeoutError if it exceeds limit
except TimeoutError:
print(f"Timeout exceeded for {file_path}")
return None
5. Save Processed Data to CSV: Store the processed rows into a CSV file.
Simplified Code Example
import pandas as pd
def save_to_csv(data_rows, output_file):
df = pd.DataFrame(data_rows, columns=["text"])
df.to_csv(output_file, index=False)
# Example usage
save_to_csv([{"text": row} for row in rows], "output.csv")
print("Data saved to CSV.")
6. Retry Missing Files: Use a while loop to retry processing until EITHER almost all files are processed (i.e., missing files less than 15), or the max retry attempts is reached (i.e., attempt 3 times.)
Simplified Code Example: Check for missing file threshold
async def run_preprocessing_pipeline_async():
"""
Orchestrates the preprocessing of yearbook datasets (2012 and 2022).
Skips processing if both datasets meet the missing files threshold.
"""
# Check if both datasets meet the missing files threshold
if all(
meet_missing_files_threshold(
source_data_dir=dir_path,
processed_data_file=processed_file,
threshold=15,
yearbook_source=year,
)
for dir_path, processed_file, year in [
(YEARBOOK_2012_DATA_DIR, PREPROCESSED_2012_DATA_FILE, "2012"),
(YEARBOOK_2022_DATA_DIR, PREPROCESSED_2022_DATA_FILE, "2022"),
]
):
logger.info("Threshold met for both datasets. Skipping preprocessing.")
return
Simplified Code Example: Check for number of tries
async def preprocessing_pipeline_async():
...
max_attempts = 3 # Maximum retry attempts
attempts = 0
while attempts < max_attempts:
missing_files = get_missing_files(source_dir, processed_data_file)
if not missing_files: # Exit loop if all files are processed
print("All files processed successfully.")
break
print(f"Retry attempt {attempts + 1}: {len(missing_files)} files remaining...")
await preprocess_missing_files_async(missing_files) # Retry missing files
attempts += 1 # Increment retry counter
if missing_files:
print(f"{len(missing_files)} files could not be processed after {max_attempts} attempts.")
Example
async def run_pipeline():
files = get_filtered_files("path/to/data", "2012")
processed_files = await process_multiple_excel_files_async(files)
save_to_csv([{"text": row} for file_rows in processed_files for row in file_rows], "output.csv")
print("Pipeline completed.")
Results & Lessons Learned
Key Lessons Learned
Takeaways:
Next Steps
The next article will cover building the neural network, training, and inference. Stay tuned.
See GitHub Repo for full project code.
Conclusion
Conclusion: In this article, we explored how the preprocessing pipeline tackles the complexities of messy, real-world Excel tables. By combining asynchronous I/O for concurrency and multithreading for parallelism, the system processes thousands of files concurrently, overcoming bottlenecks typically caused by Microsoft COM-based operations in large-scale data preparation.
This hybrid approach makes the pipeline adaptive and resilient, capable of handling diverse datasets and real-world inconsistencies, resulting in an automated, scalable solution that transforms chaotic data into clean, structured, and vectorized table rows—ready for machine learning tasks.
Questions for You:
"Have you faced similar issues with merged cells?"
"What strategies have you used to process large volumes of inconsistent data? Have you experimented with asynchronous workflows or other optimizations?"
Appendix A: More on Microsoft COM
Microsoft COM (Component Object Model) is a legacy interface that was built in the pre-cloud days and allows software components to interact with each other within Windows applications. It’s commonly used to control programs like Excel and Word programmatically through external scripts or code. For example, VBA (yes, chagrin to IT security professionals today, plenty of people are still using it) relies on COM. It can access advanced Excel features (e.g., unmerging cells), but COM-based operations are "singular (non-parallel) & linear (synchronous)" by design, which makes it slow, resource-intensive, and crashes when it comes to large datasets.
Appendix B: Concurrency & Parallelism
Concurrency and parallelism are often used interchangeably (they both make programs go faster), but it's important to understand the subtleties between the two:
Concurrency (Asynchronous I/O for File Operations)
Reading/writing files is an I/O-bound task (input/output), meaning the bottleneck is the speed of disk operations, not CPU calculations. Concurrency improves throughput by overlapping these waiting times, ensuring multiple file operations are in progress simultaneously.
Parallelism (Multithreading for Table Processing)
Concurrency (Asynchronous I/O)
Parallelism (Multithreading for CPU-Bound Tasks)
Is ThreadPoolExecutor True Parallel
There is a subtlety about whether ThreadPoolExecutor is "truly parallel." In the training and inference pipelines (training, inference), I will use multi-processing in GPU operations, which represents "real parallelism" because they "split up" the GPU physically.
Technically speaking, ThreadPoolExecutor behaves more like "virtual multi-threading" for GPU-bound tasks due to Python's GIL (global interpreter lock), which allows only one thread executes Python bytecode at a time, even in a multi-threaded program - it's like having a full kitchen staff but only leaves out only one designated stove.) It has been a constant complaint from more experienced Python developers. According to Python Software Foundation's latest PSF annual general meeting (AGM), we should expect some relaxation on this one in coming releases.)
However, for operations like ours (file I/O and libraries like pandas and openpyxl), the GIL is released after each operation anyway. This allows threads to run almost simultaneously. Therefore, it was very close to true parallel processing.
link to article 1: https://www.dhirubhai.net/pulse/table-parsing-made-simple-homegrown-neural-networks-part-zhang-urr6e/?trackingId=Macv2yIsR5CD9ZzIYZbDxg%3D%3D