Snippet: Speeding up Bulk Upload Speeds to Elastic with Parallelisation in Python

Snippet: Speeding up Bulk Upload Speeds to Elastic with Parallelisation in Python

Scenario:

Uploading 35,000 large text documents of the format below, roughly 1-1500 words each, to an Elastic Cloud index without additional processing or ingestion pipelining. Document size is 350MB.

{
"_id": "d2ab8863-c548-43a4-8645-402d0986a33b"
"text": "Database Administrator - Family Private Care LLC Lawrenceville, GA A self-motivated Production SQL Server Database Administrator .... "
}        

Setting

Destination: Elastic Cloud on GCP, asia-southeast1, Compute-Optimized cluster

Origin: M3 Pro, 12 threads

Problem

Uploading under prescribed scenario with elasticsearch.helpers.bulk api in batches of 500 documents requires 70-80s to complete on average. Want faster.

Solution

Parallel batch upload with concurrent.futures module.

Result

Naive (Sequential, batch size 1) - 695 seconds

Base (Sequential, batch size 500) - 73.4 seconds

Parallel (5 workers maximum, batch size 500) - 37.1 seconds

Parallel (10 workers maximum, batch size 500) - 27.2 seconds

Parallel (10 workers, batch size 250) - 29 seconds

Parallel (10 workers, batch size 1000) - 41 seconds


Code Snippet

import os
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from elasticsearch import Elasticsearch, helpers # elasticsearch==8.14.0
from tqdm import tqdm # tqdm==4.66.4

def bulk_upload_pickle_to_elasticsearch(data, index_name, es, batch_size=500, max_workers=10):
    ''' 
    data: [ {document} ]
        document: {
                    "_id": str
                    ...
                  }
    index_name: str 
    es: Elasticsearch 
    batch_size: int 
    max_workers: int
    '''
    total_documents = len(data)
    success_bar = tqdm(total=total_documents, desc="Successful uploads", colour="green")
    failed_bar = tqdm(total=total_documents, desc="Failed uploads", colour="red")

    def create_action(doc):
        '''
        Define upload action from source documents
        '''
        return {
            "_index": index_name,
            "_id": doc["id_"],
            "body": doc["text"]
        }

    def read_and_create_batches(data):
        ''' 
        Yield document batches
        '''
        batch = []
        for doc in data:
            batch.append(create_action(doc))
            if len(batch) == batch_size:
                yield batch
                batch = []
        if batch:
            yield batch

    def upload_batch(batch):
        ''' 
        Make bulk call for batch
        '''
        try:
            success, failed = helpers.bulk(es, batch, raise_on_error=False, request_timeout=45)
            if isinstance(failed, list):
                failed = len(failed)
            return success, failed
        except Exception as e:
            print(f"Error during bulk upload: {str(e)}")
            return 0, len(batch)

    ''' 
    Parallel execution of batch upload
    '''
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_batch = {executor.submit(upload_batch, batch): batch for batch in read_and_create_batches(data)}
        for future in as_completed(future_to_batch):
            success, failed = future.result()
            success_bar.update(success)
            failed_bar.update(failed)

    ''' 
    Updata progress bars
    '''
    total_uploaded = success_bar.n
    total_failed = failed_bar.n
    success_bar.close()
    failed_bar.close()

    return total_uploaded, total_failed

try:
    es_endpoint = os.environ.get("ELASTIC_ENDPOINT")
    es_client = Elasticsearch(
        es_endpoint,
        api_key=os.environ.get("ELASTIC_API_KEY")
    )
except Exception as e:
    es_client = None

total_uploaded, total_failed = bulk_upload_pickle_to_elasticsearch(documents, "resumes_base", es_client)
print(f"Total uploaded: {total_uploaded}, Total failed: {total_failed}")        

要查看或添加评论,请登录

Han Xiang Choong的更多文章