Harnessing the Data Tidal Wave: Exploring AWS for Unstructured Data Processing
David Carnahan MD MSCE
Healthcare IT expert leading Veteran and Military Health initiatives at AWS
The opinions expressed in this blog are my own, and do not necessarily represent the opinions of Amazon, Amazon Web Services, or any other entity not named David Carnahan.
The exponential growth of data generation is a phenomenon that cannot be ignored. According to recent statistics from Statista, we are facing a staggering 463 zettabytes of data by 2025, with every person creating 2 MB of data every second. The sheer volume of data generated through messaging platforms like WhatsApp (41 million messages per minute in 2020), IoT devices (29 billion worldwide by 2030), and wearable devices (28 petabytes of data per day) is truly mind-boggling.
This blog will explore how AWS services like AWS Lambda, Amazon Textract, and Amazon Bedrock can help organizations process and extract value from unstructured data, particularly in the healthcare industry where a significant portion of data is unstructured. Through practical examples, we'll demonstrate how to leverage these tools for data management, information extraction, and unlocking the potential of unstructured healthcare data.
The Project Architecture
I always like to start my project explorations with a draw.io high level plan. Given I'm not pushing this to production, a high level is all I need to create a blueprint to work from.
Summary Text Generation from Text Files
The first step I wanted to take was to triage the document type to its own folder. I wanted to do this so I could create clean, simple, lambda functions for each data type -- ultimately converting them all to text files that I would then send to Amazon Bedrock (Claude 2.0). This lambda function will be triggered when a file is added to the 'loadingbucket'.
import boto3
from urllib.parse import unquote_plus
s3_client = boto3.client('s3')
def lambda_handler(event, context):
mainbucket = 'loadingbucket'
pdfbucket = 'pdfbucket'
txtbucket = 'txtbucket'
imgbucket = 'imgbucket'
# Get the object from the event
record = event['Records'][0]
key = unquote_plus(record['s3']['object']['key'])
file_extension = key.split('.')[-1].lower()
# Determine the target bucket based on the file extension
if file_extension in ['jpg', 'jpeg', 'png', 'gif']:
target_bucket = imgbucket
elif file_extension == 'pdf':
target_bucket = pdfbucket
elif file_extension in ['txt', 'csv', 'doc', 'docx', 'xlsx']:
target_bucket = txtbucket
else:
print(f"Unsupported file type: {file_extension}")
return
# Copy the file to the appropriate bucket
copy_source = {'Bucket': mainbucket, 'Key': key}
s3_client.copy(copy_source, target_bucket, key)
# Delete the original file from mainbucket
s3_client.delete_object(Bucket=mainbucket, Key=key)
print(f"File {key} was moved to {target_bucket} and deleted from {mainbucket}.")
Once I had the file types in their own bucket ... I built a lambda function for each type that triggered when the file object was created. Here is the code for the text files bucket. The code is followed by an example output of a file used for testing -- a white paper that will be coming in the near future on the pandemic.
import boto3
import botocore.config
import json
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
#-------------------------------------------------------------
# generate summary with bedrock (helper function)
#------------------------------------------------------------------
def generate_summary_from_bedrock(content:str) -> str:
prompt_text = f"""Human: Summarize the content that follows. Make the output no more than 2 paragraphs or less than 250 words. {content}
Assistant:"""
body = {
"prompt":prompt_text,
"max_tokens_to_sample":7000,
"temperature":0.1,
"top_k":250,
"top_p":0.2,
"stop_sequences": ["\n\nHuman:"]
}
try:
bedrock = boto3.client("bedrock-runtime",region_name="us-east-1",config = botocore.config.Config(read_timeout=300, retries = {'max_attempts':3}))
response = bedrock.invoke_model(body=json.dumps(body),modelId="anthropic.claude-v2")
response_content = response.get('body').read().decode('utf-8')
response_data = json.loads(response_content)
summary = response_data["completion"].strip()
return summary
except Exception as e:
print(f"Error generating the summary: {e}")
return ""
#------------------------------------------------------------------
# save summary to s3 (helper function)
#------------------------------------------------------------------
def save_summary_to_s3_bucket(summary, s3_bucket, s3_key):
s3 = boto3.client('s3')
try:
s3.put_object(Bucket = "summarybucket", Key = s3_key, Body = summary)
print("Summary saved to s3")
except Exception as e:
print("Error when saving the summary to s3")
#------------------------------------------------------------------
# handler function (main function lambda uses)
#------------------------------------------------------------------
def lambda_handler(event, context):
s3 = boto3.client('s3')
# Extract bucket name and key from the event
record = event['Records'][0]['s3']
src_bucket = record['bucket']['name']
src_key = unquote_plus(record['object']['key'])
# Read the file content from S3
file_obj = s3.get_object(Bucket=src_bucket, Key=src_key)
file_content = file_obj['Body'].read().decode('utf-8')
# send content to bedrock
summary = generate_summary_from_bedrock(file_content)
if summary:
current_time = datetime.now().strftime('%H%M%S')
s3_key = f'summary-output/{src_key}-{current_time}.txt'
s3_bucket = 'summarybucket'
save_summary_to_s3_bucket(summary, s3_bucket, s3_key)
else:
print("No summary was generated")
return {
'statusCode':200,
'body':json.dumps("Summary generation finished")
}
Here's the summary output for the full text file I created of the white paper I mentioned:
领英推荐
Summary Text Generation from Image Files
This part of the experiment piqued my curiosity ... I wanted to know how well Claude 2 could take text extracted from an image by Amazon Textract and then provide a summary that accurately described the original image. I was blown away with how well it did it for my test image. I'll need to do more testing with other images to get a feel for whether to expect this good of a result. But I guess ... the more text the image has ... the better the summary.
Here is the code I used for this part of the project:
import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
##---------------------------------------------
## upload text to S3 (helper function)
##----------------------------------------------
def save_image_text_to_s3_bucket(raw_text, s3_bucket, s3_key):
s3 = boto3.client('s3')
try:
s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
print("Text file saved to txtbucket")
except Exception as e:
print("Error when saving the summary to s3")
##---------------------------------------------------------
## extract text from IMG using Amazon Textract (helper function)
##---------------------------------------------------------
def detect_image_text(s3_bucket_name, document_key):
textract = boto3.client('textract')
extracted_text = ""
# Call Textract to process the image directly
response = textract.detect_document_text(
Document={
'S3Object': {
'Bucket': s3_bucket_name,
'Name': document_key
}
}
)
# Extract text
for item in response['Blocks']:
if item['BlockType'] == 'LINE':
extracted_text += item['Text'] + '\n'
return extracted_text
##------------------------------------------------------------
## handler function (main function lambda uses)
##------------------------------------------------------------
def lambda_handler(event, context):
# Extract bucket name and key from the event
record = event['Records'][0]['s3']
src_bucket = record['bucket']['name']
src_key = unquote_plus(record['object']['key'])
pathname = f's3://{src_bucket}/{src_key}'
# Extract text from the PDF using Textract
extracted_text = detect_image_text(src_bucket, src_key)
print(f'here is the extracted text: {extracted_text}')
# File name
current_time = datetime.now().strftime('%H%M%S')
s3_key = f'pdf-output/{src_key}-{current_time}.txt'
target_bucket = 'txtbucket'
# Upload the extracted text as a '.txt' file to the 'target_bucket'
save_image_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")
As you guessed, once this text hits the 'txtbucket' ... it triggers the lambda that sends the text to bedrock for summarization.
The lambda for PDF was similar to the image function, but the differences are important so I'll provide that function as part of the overall code at the bottom of the blog in case anyone is interested in trying this experiment as well.
Drumroll please.
Here is the image I used for the test:
And here is the resultant output I got from Claude 2 in Amazon Bedrock.
Summary
In the era of exponential data growth, harnessing the immense value within this tidal wave has become crucial. Hopefully, this blog helped you see how leveraging AWS services to revolutionize unstructured data processing is possible. Specifically, the code implementations and real-world data should demonstrate the potential transformative power of AWS Lambda, Amazon Textract, and Amazon Bedrock using the Claude 2 model for your needs.
I would encourage all of you to immerse yourself in this serverless technology and embrace the fast and furious future of data-driven insights.
Code Addendum
# Lambda 1: Triage the Document
import boto3
from urllib.parse import unquote_plus
s3_client = boto3.client('s3')
def lambda_handler(event, context):
mainbucket = 'loadingbucket'
pdfbucket = 'pdfbucket'
txtbucket = 'txtbucket'
imgbucket = 'imgbucket'
# Get the object from the event
record = event['Records'][0]
key = unquote_plus(record['s3']['object']['key'])
file_extension = key.split('.')[-1].lower()
# Determine the target bucket based on the file extension
if file_extension in ['jpg', 'jpeg', 'png', 'gif']:
target_bucket = imgbucket
elif file_extension == 'pdf':
target_bucket = pdfbucket
elif file_extension in ['txt', 'csv', 'doc', 'docx', 'xlsx']:
target_bucket = txtbucket
else:
print(f"Unsupported file type: {file_extension}")
return
# Copy the file to the appropriate bucket
copy_source = {'Bucket': mainbucket, 'Key': key}
s3_client.copy(copy_source, target_bucket, key)
# Delete the original file from mainbucket
s3_client.delete_object(Bucket=mainbucket, Key=key)
print(f"File {key} was moved to {target_bucket} and deleted from {mainbucket}.")
# Lambda 2: Send Text Files to Amazon Bedrock for Summarization
import boto3
import botocore.config
import json
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
#-------------------------------------------------------------
# generate summary with bedrock (helper function)
#------------------------------------------------------------------
def generate_summary_from_bedrock(content:str) -> str:
prompt_text = f"""Human: Summarize the content that follows. Make the output no more than 2 paragraphs or less than 250 words. {content}
Assistant:"""
body = {
"prompt":prompt_text,
"max_tokens_to_sample":7000,
"temperature":0.1,
"top_k":250,
"top_p":0.2,
"stop_sequences": ["\n\nHuman:"]
}
try:
bedrock = boto3.client("bedrock-runtime",region_name="us-east-1",config = botocore.config.Config(read_timeout=300, retries = {'max_attempts':3}))
response = bedrock.invoke_model(body=json.dumps(body),modelId="anthropic.claude-v2")
response_content = response.get('body').read().decode('utf-8')
response_data = json.loads(response_content)
summary = response_data["completion"].strip()
return summary
except Exception as e:
print(f"Error generating the summary: {e}")
return ""
#------------------------------------------------------------------
# save summary to s3 (helper function)
#------------------------------------------------------------------
def save_summary_to_s3_bucket(summary, s3_bucket, s3_key):
s3 = boto3.client('s3')
try:
s3.put_object(Bucket = "summarybucket", Key = s3_key, Body = summary)
print("Summary saved to s3")
except Exception as e:
print("Error when saving the summary to s3")
#------------------------------------------------------------------
# handler function (main function lambda uses)
#------------------------------------------------------------------
def lambda_handler(event, context):
s3 = boto3.client('s3')
# Extract bucket name and key from the event
record = event['Records'][0]['s3']
src_bucket = record['bucket']['name']
src_key = unquote_plus(record['object']['key'])
# Read the file content from S3
file_obj = s3.get_object(Bucket=src_bucket, Key=src_key)
file_content = file_obj['Body'].read().decode('utf-8')
# send content to bedrock
summary = generate_summary_from_bedrock(file_content)
if summary:
current_time = datetime.now().strftime('%H%M%S')
s3_key = f'summary-output/{src_key}-{current_time}.txt'
s3_bucket = 'summarybucket'
save_summary_to_s3_bucket(summary, s3_bucket, s3_key)
else:
print("No summary was generated")
return {
'statusCode':200,
'body':json.dumps("Summary generation finished")
}
# Lambda 3: Send Image to Amazon Textract and then to Amazon Bedrock
import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
##---------------------------------------------
## upload text to S3 (helper function)
##----------------------------------------------
def save_image_text_to_s3_bucket(raw_text, s3_bucket, s3_key):
s3 = boto3.client('s3')
try:
s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
print("Text file saved to txtbucket")
except Exception as e:
print("Error when saving the summary to s3")
##---------------------------------------------------------
## extract text from IMG using Amazon Textract (helper function)
##---------------------------------------------------------
def detect_image_text(s3_bucket_name, document_key):
textract = boto3.client('textract')
extracted_text = ""
# Call Textract to process the image directly
response = textract.detect_document_text(
Document={
'S3Object': {
'Bucket': s3_bucket_name,
'Name': document_key
}
}
)
# Extract text
for item in response['Blocks']:
if item['BlockType'] == 'LINE':
extracted_text += item['Text'] + '\n'
return extracted_text
##------------------------------------------------------------
## handler function (main function lambda uses)
##------------------------------------------------------------
def lambda_handler(event, context):
# Extract bucket name and key from the event
record = event['Records'][0]['s3']
src_bucket = record['bucket']['name']
src_key = unquote_plus(record['object']['key'])
pathname = f's3://{src_bucket}/{src_key}'
# Extract text from the PDF using Textract
extracted_text = detect_image_text(src_bucket, src_key)
print(f'here is the extracted text: {extracted_text}')
# File name
current_time = datetime.now().strftime('%H%M%S')
s3_key = f'pdf-output/{src_key}-{current_time}.txt'
target_bucket = 'txtbucket'
# Upload the extracted text as a '.txt' file to the 'target_bucket'
save_image_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")
# Lambda 4: Send PDF to Amazon Textract and then to Amazon Bedrock
import boto3
import botocore.config
import json
import time
import base64
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
##---------------------------------------------
## upload text to S3
##----------------------------------------------
def save_pdf_text_to_s3_bucket(raw_text, s3_bucket, s3_key):
s3 = boto3.client('s3')
try:
s3.put_object(Bucket = "txtbucket", Key = s3_key, Body = raw_text)
print("Text file saved to txtbucket")
except Exception as e:
print("Error when saving the summary to s3")
##---------------------------------------------------------
## extract text from PDF using Amazon Textract
##---------------------------------------------------------
def detect_pdf_text(s3_bucket_name, document_key):
textract = boto3.client('textract')
extracted_text = ""
# Start a text detection job
response = textract.start_document_text_detection(
DocumentLocation={
'S3Object': {
'Bucket': s3_bucket_name,
'Name': document_key
}
}
)
# Extract job ID
job_id = response['JobId']
print(f"Started job with id: {job_id}")
# Poll for job completion
status = "IN_PROGRESS"
while status == "IN_PROGRESS":
status_response = textract.get_document_text_detection(JobId=job_id)
status = status_response['JobStatus']
print(f"Job status: {status}")
if status == 'SUCCEEDED':
break
elif status == 'FAILED':
raise Exception("Textract job failed")
else:
time.sleep(5) # Wait before polling again
# Once job is SUCCEEDED, handle pagination and extract text
next_token = None
while True:
if next_token:
status_response = textract.get_document_text_detection(JobId=job_id, NextToken=next_token)
else:
status_response = textract.get_document_text_detection(JobId=job_id)
# Iterate over pages and blocks to extract text
for block in status_response.get('Blocks', []):
if block['BlockType'] == 'LINE':
extracted_text += block['Text'] + '\n'
next_token = status_response.get('NextToken', None)
if not next_token:
break # Exit the loop when there are no more pages
return extracted_text
##------------------------------------------------------------
## main function
##------------------------------------------------------------
def lambda_handler(event, context):
# Extract bucket name and key from the event
record = event['Records'][0]['s3']
src_bucket = record['bucket']['name']
src_key = unquote_plus(record['object']['key'])
# Extract text from the PDF using Textract
extracted_text = detect_pdf_text(src_bucket, src_key)
# File name
current_time = datetime.now().strftime('%H%M%S')
s3_key = f'pdf-output/{src_key}-{current_time}.txt'
target_bucket = 'txtbucket'
# Upload the extracted text as a '.txt' file to the 'target_bucket'
save_pdf_text_to_s3_bucket(extracted_text, target_bucket, s3_key)
print(f"Uploaded extracted text to s3://{target_bucket}/{s3_key}")
Thrilled to see the spotlight on the transformative power of #AWS services in managing unstructured data in healthcare! ?? Harnessing these tools can truly unlock the potential hidden within vast data oceans. As Satya Nadella once remarked, embracing digital technology is no longer an option but a necessity. Let's continue to innovate and lead the charge in turning data challenges into opportunities. #innovation #datarevolution #healthcaretech
I help fitness brands & tech startups dominate their niche with SEO + Content that attracts, engages, and converts.
11 个月Excited to delve into the world of AWS services for unstructured data processing!
Healthcare IT expert leading Veteran and Military Health initiatives at AWS
11 个月Gabriel Juarez Charles Gabrial Agustin "Tino" Moreno, MS Trey Oats Jesus J Caban, PhD Tanisha Hammill Lou Ferrucci Chakib Jaber Chris Nichols Andrew D. Plummer, MD, MPH Terry Adirim, MD, MPH, MBA Mark Poe Jason Nixdorf Adam Ginsburg Audrey Reinke Andres Vega Jr. Jamie Baker Albert Bonnema, MD MPH Chris Caggiano, MD FACEP