Snippet: Speeding up Bulk Upload Speeds to Elastic with Parallelisation in Python
Han Xiang Choong
Senior Customer Architect - APJ @ Elastic | Applied AI/ML | Search Experiences | Delivering Real-World Impact
Scenario:
Uploading 35,000 large text documents of the format below, roughly 1-1500 words each, to an Elastic Cloud index without additional processing or ingestion pipelining. Document size is 350MB.
{
"_id": "d2ab8863-c548-43a4-8645-402d0986a33b"
"text": "Database Administrator - Family Private Care LLC Lawrenceville, GA A self-motivated Production SQL Server Database Administrator .... "
}
Setting
Destination: Elastic Cloud on GCP, asia-southeast1, Compute-Optimized cluster
Origin: M3 Pro, 12 threads
Problem
Uploading under prescribed scenario with elasticsearch.helpers.bulk api in batches of 500 documents requires 70-80s to complete on average. Want faster.
Solution
Parallel batch upload with concurrent.futures module.
Result
Naive (Sequential, batch size 1) - 695 seconds
Base (Sequential, batch size 500) - 73.4 seconds
Parallel (5 workers maximum, batch size 500) - 37.1 seconds
Parallel (10 workers maximum, batch size 500) - 27.2 seconds
Parallel (10 workers, batch size 250) - 29 seconds
Parallel (10 workers, batch size 1000) - 41 seconds
Code Snippet
import os
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from elasticsearch import Elasticsearch, helpers # elasticsearch==8.14.0
from tqdm import tqdm # tqdm==4.66.4
def bulk_upload_pickle_to_elasticsearch(data, index_name, es, batch_size=500, max_workers=10):
'''
data: [ {document} ]
document: {
"_id": str
...
}
index_name: str
es: Elasticsearch
batch_size: int
max_workers: int
'''
total_documents = len(data)
success_bar = tqdm(total=total_documents, desc="Successful uploads", colour="green")
failed_bar = tqdm(total=total_documents, desc="Failed uploads", colour="red")
def create_action(doc):
'''
Define upload action from source documents
'''
return {
"_index": index_name,
"_id": doc["id_"],
"body": doc["text"]
}
def read_and_create_batches(data):
'''
Yield document batches
'''
batch = []
for doc in data:
batch.append(create_action(doc))
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
def upload_batch(batch):
'''
Make bulk call for batch
'''
try:
success, failed = helpers.bulk(es, batch, raise_on_error=False, request_timeout=45)
if isinstance(failed, list):
failed = len(failed)
return success, failed
except Exception as e:
print(f"Error during bulk upload: {str(e)}")
return 0, len(batch)
'''
Parallel execution of batch upload
'''
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {executor.submit(upload_batch, batch): batch for batch in read_and_create_batches(data)}
for future in as_completed(future_to_batch):
success, failed = future.result()
success_bar.update(success)
failed_bar.update(failed)
'''
Updata progress bars
'''
total_uploaded = success_bar.n
total_failed = failed_bar.n
success_bar.close()
failed_bar.close()
return total_uploaded, total_failed
try:
es_endpoint = os.environ.get("ELASTIC_ENDPOINT")
es_client = Elasticsearch(
es_endpoint,
api_key=os.environ.get("ELASTIC_API_KEY")
)
except Exception as e:
es_client = None
total_uploaded, total_failed = bulk_upload_pickle_to_elasticsearch(documents, "resumes_base", es_client)
print(f"Total uploaded: {total_uploaded}, Total failed: {total_failed}")