Harnessing the Data Tidal Wave: Exploring AWS for Unstructured Data Processing
An estimated 80% of Healthcare Data is Unstructured

Harnessing the Data Tidal Wave: Exploring AWS for Unstructured Data Processing


The opinions expressed in this blog are my own, and do not necessarily represent the opinions of Amazon, Amazon Web Services, or any other entity not named David Carnahan.


The exponential growth of data generation is a phenomenon that cannot be ignored. According to recent statistics from Statista, we are facing a staggering 463 zettabytes of data by 2025, with every person creating 2 MB of data every second. The sheer volume of data generated through messaging platforms like WhatsApp (41 million messages per minute in 2020), IoT devices (29 billion worldwide by 2030), and wearable devices (28 petabytes of data per day) is truly mind-boggling.

This blog will explore how AWS services like AWS Lambda, Amazon Textract, and Amazon Bedrock can help organizations process and extract value from unstructured data, particularly in the healthcare industry where a significant portion of data is unstructured. Through practical examples, we'll demonstrate how to leverage these tools for data management, information extraction, and unlocking the potential of unstructured healthcare data.

The Project Architecture

I always like to start my project explorations with a draw.io high level plan. Given I'm not pushing this to production, a high level is all I need to create a blueprint to work from.

Summary Text Generation from Text Files

The first step I wanted to take was to triage the document type to its own folder. I wanted to do this so I could create clean, simple, lambda functions for each data type -- ultimately converting them all to text files that I would then send to Amazon Bedrock (Claude 2.0). This lambda function will be triggered when a file is added to the 'loadingbucket'.

import boto3
from urllib.parse import unquote_plus

s3_client = boto3.client('s3')

def lambda_handler(event, context):
    mainbucket = 'loadingbucket'
    pdfbucket = 'pdfbucket'
    txtbucket = 'txtbucket'
    imgbucket = 'imgbucket'

    # Get the object from the event
    record = event['Records'][0]
    key = unquote_plus(record['s3']['object']['key'])
    file_extension = key.split('.')[-1].lower()

    # Determine the target bucket based on the file extension
    if file_extension in ['jpg', 'jpeg', 'png', 'gif']:
        target_bucket = imgbucket
    elif file_extension == 'pdf':
        target_bucket = pdfbucket
    elif file_extension in ['txt', 'csv', 'doc', 'docx', 'xlsx']:
        target_bucket = txtbucket
    else:
        print(f"Unsupported file type: {file_extension}")
        return

    # Copy the file to the appropriate bucket
    copy_source = {'Bucket': mainbucket, 'Key': key}
    s3_client.copy(copy_source, target_bucket, key)

    # Delete the original file from mainbucket
    s3_client.delete_object(Bucket=mainbucket, Key=key)

    print(f"File {key} was moved to {target_bucket} and deleted from {mainbucket}.")        

Once I had the file types in their own bucket ... I built a lambda function for each type that triggered when the file object was created. Here is the code for the text files bucket. The code is followed by an example output of a file used for testing -- a white paper that will be coming in the near future on the pandemic.

import boto3
import botocore.config
import json
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError

#-------------------------------------------------------------
# generate summary with bedrock (helper function)
#------------------------------------------------------------------

def generate_summary_from_bedrock(content:str) -> str:
    prompt_text = f"""Human: Summarize the content that follows. Make the output no more than 2 paragraphs or less than 250 words. {content}
    Assistant:"""

    body = {
        "prompt":prompt_text,
        "max_tokens_to_sample":7000,
        "temperature":0.1,
        "top_k":250,
        "top_p":0.2,
        "stop_sequences": ["\n\nHuman:"]
    }

    try:
        bedrock = boto3.client("bedrock-runtime",region_name="us-east-1",config = botocore.config.Config(read_timeout=300, retries = {'max_attempts':3}))
        response = bedrock.invoke_model(body=json.dumps(body),modelId="anthropic.claude-v2")
        response_content = response.get('body').read().decode('utf-8')
        response_data = json.loads(response_content)
        summary = response_data["completion"].strip()
        return summary

    except Exception as e:
        print(f"Error generating the summary: {e}")
        return ""

#------------------------------------------------------------------
# save summary to s3 (helper function)
#------------------------------------------------------------------

def save_summary_to_s3_bucket(summary, s3_bucket, s3_key):

    s3 = boto3.client('s3')

    try:
        s3.put_object(Bucket = "summarybucket", Key = s3_key, Body = summary)
        print("Summary saved to s3")

    except Exception as e:
        print("Error when saving the summary to s3")


#------------------------------------------------------------------
# handler function (main function lambda uses)
#------------------------------------------------------------------

def lambda_handler(event, context):
    
    s3 = boto3.client('s3')

    # Extract bucket name and key from the event
    record = event['Records'][0]['s3']
    src_bucket = record['bucket']['name']
    src_key = unquote_plus(record['object']['key'])
    
    # Read the file content from S3
    file_obj = s3.get_object(Bucket=src_bucket, Key=src_key)
    file_content = file_obj['Body'].read().decode('utf-8')
    
    # send content to bedrock  
    summary = generate_summary_from_bedrock(file_content)
   
    if summary:
        current_time = datetime.now().strftime('%H%M%S') 
        s3_key = f'summary-output/{src_key}-{current_time}.txt'
        s3_bucket = 'summarybucket'

        save_summary_to_s3_bucket(summary, s3_bucket, s3_key)

    else:
        print("No summary was generated")


    return {
        'statusCode':200,
        'body':json.dumps("Summary generation finished")
    }        

Here's the summary output for the full text file I created of the white paper I mentioned:

Summary Text Generation from Image Files

This part of the experiment piqued my curiosity ... I wanted to know how well Claude 2 could take text extracted from an image by Amazon Textract and then provide a summary that accurately described the original image. I was blown away with how well it did it for my test image. I'll need to do more testing with other images to get a feel for whether to expect this good of a result. But I guess ... the more text the image has ... the better the summary.

Here is the code I used for this part of the project:

import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError


##---------------------------------------------
## upload text to S3 (helper function)
##----------------------------------------------

def save_image_text_to_s3_bucket(raw_text, s3_bucket, s3_key):

    s3 = boto3.client('s3')

    try:
        s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
        print("Text file saved to txtbucket")

    except Exception as e:
        print("Error when saving the summary to s3")


##---------------------------------------------------------
## extract text from IMG using Amazon Textract (helper function)
##---------------------------------------------------------

def detect_image_text(s3_bucket_name, document_key):

    textract = boto3.client('textract')
    extracted_text = ""

    # Call Textract to process the image directly
    response = textract.detect_document_text(
        Document={
            'S3Object': {
                'Bucket': s3_bucket_name,
                'Name': document_key
            }
        }
    )

    # Extract text
    for item in response['Blocks']:
        if item['BlockType'] == 'LINE':
            extracted_text += item['Text'] + '\n'

    return extracted_text


##------------------------------------------------------------
## handler function (main function lambda uses)
##------------------------------------------------------------

def lambda_handler(event, context):
    
    # Extract bucket name and key from the event
    record = event['Records'][0]['s3']
    src_bucket = record['bucket']['name']
    src_key = unquote_plus(record['object']['key'])
    pathname = f's3://{src_bucket}/{src_key}'

    # Extract text from the PDF using Textract
    extracted_text = detect_image_text(src_bucket, src_key)
    print(f'here is the extracted text: {extracted_text}')
    
    # File name
    current_time = datetime.now().strftime('%H%M%S') 
    s3_key = f'pdf-output/{src_key}-{current_time}.txt'
    target_bucket = 'txtbucket'
    
    # Upload the extracted text as a '.txt' file to the 'target_bucket'
    save_image_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
    
    print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")
        

As you guessed, once this text hits the 'txtbucket' ... it triggers the lambda that sends the text to bedrock for summarization.

The lambda for PDF was similar to the image function, but the differences are important so I'll provide that function as part of the overall code at the bottom of the blog in case anyone is interested in trying this experiment as well.

Drumroll please.

Here is the image I used for the test:

And here is the resultant output I got from Claude 2 in Amazon Bedrock.

Summary

In the era of exponential data growth, harnessing the immense value within this tidal wave has become crucial. Hopefully, this blog helped you see how leveraging AWS services to revolutionize unstructured data processing is possible. Specifically, the code implementations and real-world data should demonstrate the potential transformative power of AWS Lambda, Amazon Textract, and Amazon Bedrock using the Claude 2 model for your needs.

I would encourage all of you to immerse yourself in this serverless technology and embrace the fast and furious future of data-driven insights.

Code Addendum

# Lambda 1: Triage the Document

import boto3
from urllib.parse import unquote_plus

s3_client = boto3.client('s3')

def lambda_handler(event, context):
    mainbucket = 'loadingbucket'
    pdfbucket = 'pdfbucket'
    txtbucket = 'txtbucket'
    imgbucket = 'imgbucket'

    # Get the object from the event
    record = event['Records'][0]
    key = unquote_plus(record['s3']['object']['key'])
    file_extension = key.split('.')[-1].lower()

    # Determine the target bucket based on the file extension
    if file_extension in ['jpg', 'jpeg', 'png', 'gif']:
        target_bucket = imgbucket
    elif file_extension == 'pdf':
        target_bucket = pdfbucket
    elif file_extension in ['txt', 'csv', 'doc', 'docx', 'xlsx']:
        target_bucket = txtbucket
    else:
        print(f"Unsupported file type: {file_extension}")
        return

    # Copy the file to the appropriate bucket
    copy_source = {'Bucket': mainbucket, 'Key': key}
    s3_client.copy(copy_source, target_bucket, key)

    # Delete the original file from mainbucket
    s3_client.delete_object(Bucket=mainbucket, Key=key)

    print(f"File {key} was moved to {target_bucket} and deleted from {mainbucket}.")



# Lambda 2: Send Text Files to Amazon Bedrock for Summarization

import boto3
import botocore.config
import json
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError

#-------------------------------------------------------------
# generate summary with bedrock (helper function)
#------------------------------------------------------------------

def generate_summary_from_bedrock(content:str) -> str:
    prompt_text = f"""Human: Summarize the content that follows. Make the output no more than 2 paragraphs or less than 250 words. {content}
    Assistant:"""

    body = {
        "prompt":prompt_text,
        "max_tokens_to_sample":7000,
        "temperature":0.1,
        "top_k":250,
        "top_p":0.2,
        "stop_sequences": ["\n\nHuman:"]
    }

    try:
        bedrock = boto3.client("bedrock-runtime",region_name="us-east-1",config = botocore.config.Config(read_timeout=300, retries = {'max_attempts':3}))
        response = bedrock.invoke_model(body=json.dumps(body),modelId="anthropic.claude-v2")
        response_content = response.get('body').read().decode('utf-8')
        response_data = json.loads(response_content)
        summary = response_data["completion"].strip()
        return summary

    except Exception as e:
        print(f"Error generating the summary: {e}")
        return ""

#------------------------------------------------------------------
# save summary to s3 (helper function)
#------------------------------------------------------------------

def save_summary_to_s3_bucket(summary, s3_bucket, s3_key):

    s3 = boto3.client('s3')

    try:
        s3.put_object(Bucket = "summarybucket", Key = s3_key, Body = summary)
        print("Summary saved to s3")

    except Exception as e:
        print("Error when saving the summary to s3")


#------------------------------------------------------------------
# handler function (main function lambda uses)
#------------------------------------------------------------------

def lambda_handler(event, context):
    
    s3 = boto3.client('s3')

    # Extract bucket name and key from the event
    record = event['Records'][0]['s3']
    src_bucket = record['bucket']['name']
    src_key = unquote_plus(record['object']['key'])
    
    # Read the file content from S3
    file_obj = s3.get_object(Bucket=src_bucket, Key=src_key)
    file_content = file_obj['Body'].read().decode('utf-8')
    
    # send content to bedrock  
    summary = generate_summary_from_bedrock(file_content)
   
    if summary:
        current_time = datetime.now().strftime('%H%M%S') 
        s3_key = f'summary-output/{src_key}-{current_time}.txt'
        s3_bucket = 'summarybucket'

        save_summary_to_s3_bucket(summary, s3_bucket, s3_key)

    else:
        print("No summary was generated")


    return {
        'statusCode':200,
        'body':json.dumps("Summary generation finished")
    }



# Lambda 3: Send Image to Amazon Textract and then to Amazon Bedrock

import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError


##---------------------------------------------
## upload text to S3 (helper function)
##----------------------------------------------

def save_image_text_to_s3_bucket(raw_text, s3_bucket, s3_key):

    s3 = boto3.client('s3')

    try:
        s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
        print("Text file saved to txtbucket")

    except Exception as e:
        print("Error when saving the summary to s3")


##---------------------------------------------------------
## extract text from IMG using Amazon Textract (helper function)
##---------------------------------------------------------

def detect_image_text(s3_bucket_name, document_key):

    textract = boto3.client('textract')
    extracted_text = ""

    # Call Textract to process the image directly
    response = textract.detect_document_text(
        Document={
            'S3Object': {
                'Bucket': s3_bucket_name,
                'Name': document_key
            }
        }
    )

    # Extract text
    for item in response['Blocks']:
        if item['BlockType'] == 'LINE':
            extracted_text += item['Text'] + '\n'

    return extracted_text


##------------------------------------------------------------
## handler function (main function lambda uses)
##------------------------------------------------------------

def lambda_handler(event, context):
    
    # Extract bucket name and key from the event
    record = event['Records'][0]['s3']
    src_bucket = record['bucket']['name']
    src_key = unquote_plus(record['object']['key'])
    pathname = f's3://{src_bucket}/{src_key}'

    # Extract text from the PDF using Textract
    extracted_text = detect_image_text(src_bucket, src_key)
    print(f'here is the extracted text: {extracted_text}')
    
    # File name
    current_time = datetime.now().strftime('%H%M%S') 
    s3_key = f'pdf-output/{src_key}-{current_time}.txt'
    target_bucket = 'txtbucket'
    
    # Upload the extracted text as a '.txt' file to the 'target_bucket'
    save_image_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
    
    print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")



# Lambda 4: Send PDF to Amazon Textract and then to Amazon Bedrock

import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError


##---------------------------------------------
## upload text to S3
##----------------------------------------------

def save_pdf_text_to_s3_bucket(raw_text, s3_bucket, s3_key):

    s3 = boto3.client('s3')

    try:
        s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
        print("Text file saved to txtbucket")

    except Exception as e:
        print("Error when saving the summary to s3")


##---------------------------------------------------------
## extract text from PDF using Amazon Textract
##---------------------------------------------------------

def detect_pdf_text(s3_bucket_name, document_key):
    
    textract = boto3.client('textract')
    extracted_text = ""

    # Start a text detection job
    response = textract.start_document_text_detection(
        DocumentLocation={
            'S3Object': {
                'Bucket': s3_bucket_name,
                'Name': document_key
            }
        }
    )

    # Extract job ID
    job_id = response['JobId']
    print(f"Started job with id: {job_id}")

    # Poll for job completion
    status = "IN_PROGRESS"
    while status == "IN_PROGRESS":
        status_response = textract.get_document_text_detection(JobId=job_id)
        status = status_response['JobStatus']
        print(f"Job status: {status}")

        if status == 'SUCCEEDED':
            break
        elif status == 'FAILED':
            raise Exception("Textract job failed")
        else:
            time.sleep(5)  # Wait before polling again

    # Once job is SUCCEEDED, handle pagination and extract text
    next_token = None
    while True:
        if next_token:
            status_response = textract.get_document_text_detection(JobId=job_id, NextToken=next_token)
        else:
            status_response = textract.get_document_text_detection(JobId=job_id)
        
        # Iterate over pages and blocks to extract text
        for block in status_response.get('Blocks', []):
            if block['BlockType'] == 'LINE':
                extracted_text += block['Text'] + '\n'

        next_token = status_response.get('NextToken', None)
        if not next_token:
            break  # Exit the loop when there are no more pages

    return extracted_text

##------------------------------------------------------------
## main function
##------------------------------------------------------------

def lambda_handler(event, context):
    
    # Extract bucket name and key from the event
    record = event['Records'][0]['s3']
    src_bucket = record['bucket']['name']
    src_key = unquote_plus(record['object']['key'])

    # Extract text from the PDF using Textract
    extracted_text = detect_pdf_text(src_bucket, src_key)
    
    # File name
    current_time = datetime.now().strftime('%H%M%S') 
    s3_key = f'pdf-output/{src_key}-{current_time}.txt'
    target_bucket = 'txtbucket'
    
    # Upload the extracted text as a '.txt' file to the 'target_bucket'
    save_pdf_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
    
    print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")        


Thrilled to see the spotlight on the transformative power of #AWS services in managing unstructured data in healthcare! ?? Harnessing these tools can truly unlock the potential hidden within vast data oceans. As Satya Nadella once remarked, embracing digital technology is no longer an option but a necessity. Let's continue to innovate and lead the charge in turning data challenges into opportunities. #innovation #datarevolution #healthcaretech

回复
Ryan Jose Brosas

I help fitness brands & tech startups dominate their niche with SEO + Content that attracts, engages, and converts.

11 个月

Excited to delve into the world of AWS services for unstructured data processing!

要查看或添加评论,请登录

David Carnahan MD MSCE的更多文章

  • Creating Your Own Chatbot with Amazon Bedrock

    Creating Your Own Chatbot with Amazon Bedrock

    The opinions expressed in this blog are my own, and do not necessarily represent the opinions of Amazon, Amazon Web…

    5 条评论
  • Meal Planner App - Step 1

    Meal Planner App - Step 1

    NOTE: The opinions expressed in this blog are my own and do not represent that of my employer Amazon / AWS Meal Planner…

    2 条评论
  • The Meal App Project Plan

    The Meal App Project Plan

    NOTE: The opinions expressed in this blog are my own and do not represent that of my employer Amazon / AWS Introduction…

    2 条评论
  • Comparing ChatGPT vs Bard for Writing a Draft Scene

    Comparing ChatGPT vs Bard for Writing a Draft Scene

    This post will be personal. I love to write! Been working on a novel for about 10 months now .

    3 条评论
  • Using GTP4 for Data Science: Experiment 2

    Using GTP4 for Data Science: Experiment 2

    NOTE: The opinions expressed in this blog are my own and do not represent that of my employer Amazon / AWS Now, let's…

    1 条评论
  • Using GPT4 for Data Science: Experiment 1

    Using GPT4 for Data Science: Experiment 1

    NOTE: The opinions expressed in this blog are my own and do not represent that of my employer Amazon / AWS. I have been…

    7 条评论
  • GPT3 Healthcare Test #1

    GPT3 Healthcare Test #1

    Consider this clinical encounter and let me know when you are done. This is a visit with a 46 yo gentleman.

    18 条评论
  • Learning Journal Entry: Jupyter Notebook on AWS EC2

    Learning Journal Entry: Jupyter Notebook on AWS EC2

    This lab will be the first of many focused on how to use AWS for data science. The format will be like the one you used…

    1 条评论
  • Better Patient Engagement with Amazon Connect

    Better Patient Engagement with Amazon Connect

    When I was a staff physician at Wilford Hall Medical Center, I was amazed at how much work I did each day that was…

    8 条评论
  • Walking the High Wire

    Walking the High Wire

    "Every great dream begins with a dreamer. Always remember, you have within you the strength, the patience, and the…

    1 条评论

社区洞察

其他会员也浏览了