Elastic Serverless, Search, RAG & Nasuni Oh My !
I thought it would be fun to take a look at Elastic Serverless in the context of indexing a small Nasuni files directory, showing how we could construct a basic prototype search interface and then taking advantage of Elastic's recent released 'Playground', a low code experience interface for developers to ground their LLM's with their own private data (aka using RAG - Retrieval Augmented Generation).
For this small Search and RAG prototype we are going to use a directory of PDF's stored on a Nasuni share.
With RAG and search in general the 'garbage in' 'garbage out' rule always applies ie. if you index a large un-curated data set, don't be surprised that what comes out of the other end is not quite what you expected !
To begin with we first need an Elastic serverless account. Luckily Elastic provide a trial that can be accessed here.
Once we have this we need to create an Elastic API key which can easily be done within the Elastic Dashboard environment, and we need to take note of the Elastic Cloud ID and the Elastic host endpoint.
Now we have these we can implement our indexer code:
import os
import mimetypes
from elasticsearch import Elasticsearch, helpers, exceptions
from PyPDF2 import PdfReader
import docx
from bs4 import BeautifulSoup
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Elasticsearch connection details
ELASTIC_CLOUD_ID = "Cloud_ID_Here" # Replace with your actual Cloud ID
ELASTIC_API_KEY = "API_Key_Here" # Replace with your actual API key
ELASTIC_HOSTS = ["Elastic_Host_Here"] # Replace with your Elasticsearch host if not using Cloud ID
INDEX_NAME = "local_projects_index" # Name of the index you wish to create
# Connect to Elasticsearch
def connect_to_elasticsearch():
try:
es = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, api_key=ELASTIC_API_KEY)
logging.info(f"Successfully connected to Elasticsearch cluster: {es.info()['cluster_name']}")
return es
except exceptions.AuthenticationException as e:
logging.error(f"Authentication failed: {str(e)}")
return None
except exceptions.ConnectionError as e:
logging.error(f"Failed to connect to Elasticsearch: {str(e)}")
return None
except Exception as e:
logging.error(f"An error occurred while connecting to Elasticsearch: {str(e)}")
return None
es = connect_to_elasticsearch()
if not es:
logging.error("Failed to establish connection to Elasticsearch. Exiting.")
exit(1)
def read_text_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
return file.read()
except Exception as e:
logging.error(f"Error reading text file {file_path}: {str(e)}")
return None
def read_pdf_file(file_path):
try:
with open(file_path, 'rb') as file:
pdf = PdfReader(file)
text = ""
for page in pdf.pages:
text += page.extract_text()
return text
except Exception as e:
logging.error(f"Error reading PDF file {file_path}: {str(e)}")
return None
def read_docx_file(file_path):
try:
doc = docx.Document(file_path)
return " ".join([paragraph.text for paragraph in doc.paragraphs])
except Exception as e:
logging.error(f"Error reading DOCX file {file_path}: {str(e)}")
return None
def read_html_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
soup = BeautifulSoup(file, 'html.parser')
return soup.get_text()
except Exception as e:
logging.error(f"Error reading HTML file {file_path}: {str(e)}")
return None
def get_file_content(file_path):
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
if file_extension in ['.txt', '.md', '.py', '.js', '.java', '.cpp']:
return read_text_file(file_path)
elif file_extension == '.pdf':
return read_pdf_file(file_path)
elif file_extension in ['.docx', '.doc']:
return read_docx_file(file_path)
elif file_extension in ['.html', '.htm']:
return read_html_file(file_path)
else:
logging.warning(f"Unsupported file type: {file_extension} for file: {file_path}")
return None
def index_directory(directory_path):
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
content = get_file_content(file_path)
if content:
logging.info(f"Indexing file: {file_path}")
yield {
"_index": INDEX_NAME,
"_source": {
"file_name": file,
"file_path": file_path,
"content": content
}
}
else:
logging.warning(f"Skipping file due to empty content: {file_path}")
# Create the index with appropriate mappings
index_body = {
"mappings": {
"properties": {
"file_name": {"type": "keyword"},
"file_path": {"type": "keyword"},
"content": {"type": "text"}
}
}
}
try:
if not es.indices.exists(index=INDEX_NAME):
es.indices.create(index=INDEX_NAME, body=index_body)
logging.info(f"Created index: {INDEX_NAME}")
else:
logging.info(f"Index {INDEX_NAME} already exists")
except exceptions.RequestError as e:
logging.error(f"An error occurred while creating the index: {str(e)}")
exit(1)
# Index the documents
projects_directory = r"Insert_DIR" # Using raw string for Windows drive paths
logging.info(f"Starting indexing process for directory: {projects_directory}")
try:
success, failed = helpers.bulk(es, index_directory(projects_directory))
logging.info(f"Successfully indexed {success} documents")
logging.info(f"Failed to index {len(failed)} documents")
if failed:
for item in failed:
logging.error(f"Failed to index: {item}")
except exceptions.ElasticsearchException as e:
logging.error(f"An error occurred while indexing documents: {str(e)}")
logging.info("Indexing process completed")
Make sure to insert the appropriate Elastic endpoints and API keys and also the directory path that you wish to index data from. Once that is done you can save and run this Python code which will create an index in your serverless instance called 'local_projects_index' (unless you changed the index name!).
You should be able to see this index now from your Elastic console.
At this point we have indexed the content of the files in the nominated directory, in my case PDF files stored on a Nasuni directory share that contain 'how to' information about using Nasuni.
Now lets build a quick Streamlit client to test our indexed data:
领英推荐
import streamlit as st
from elasticsearch import Elasticsearch
from urllib.parse import quote
# Set page config at the very beginning
st.set_page_config(page_title="Nasuni Elastic Search Client", layout="wide")
# Elasticsearch connection details
ELASTIC_CLOUD_ID = "Cloud_ID_Here" # Replace with your actual Cloud ID
ELASTIC_API_KEY = "API_Key_Here" # Replace with your actual API key
ELASTIC_HOSTS = ["Elastic_Host_Here"] # Replace with your Elasticsearch host if not using Cloud ID
INDEX_NAME = "local_projects_index"
# Connect to Elasticsearch
@st.cache_resource
def connect_to_elasticsearch():
try:
es = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, api_key=ELASTIC_API_KEY)
# Test the connection
if not es.ping():
raise ValueError("Connection failed")
return es
except Exception as e:
st.error(f"Failed to connect to Elasticsearch: {str(e)}")
return None
es = connect_to_elasticsearch()
def search_documents(query):
try:
result = es.search(index=INDEX_NAME, body={
"query": {
"multi_match": {
"query": query,
"fields": ["content", "file_name"],
"type": "best_fields",
"tie_breaker": 0.3
}
},
"highlight": {
"fields": {
"content": {"fragment_size": 200, "number_of_fragments": 3},
"file_name": {}
}
}
})
return result['hits']['hits']
except Exception as e:
st.error(f"Search failed: {str(e)}")
return []
# Function to encode file path for URL
def encode_file_path(file_path):
return quote(file_path)
# Streamlit UI
col1, col2, col3 = st.columns([1,2,1])
with col2:
st.image("Your_Logo_URL", width=200) # Replace with your logo URL
st.title("Nasuni Elastic Search Client")
# Search input
query = st.text_input("Enter your search query")
if query and es:
results = search_documents(query)
if results:
st.write(f"Found {len(results)} results:")
for hit in results:
with st.expander(f"{hit['_source']['file_name']} (Score: {hit['_score']:.2f})"):
if 'highlight' in hit:
if 'content' in hit['highlight']:
st.markdown("**Content Snippets:**")
for fragment in hit['highlight']['content']:
st.markdown(f"...{fragment}...", unsafe_allow_html=True)
if 'file_name' in hit['highlight']:
st.markdown("**File Name Highlight:**")
st.markdown(hit['highlight']['file_name'][0], unsafe_allow_html=True)
file_path = hit['_source']['file_path']
encoded_path = encode_file_path(file_path)
st.markdown(f"[Open Document](file://{encoded_path})", unsafe_allow_html=True)
else:
st.write("No results found.")
elif query:
st.write("Unable to perform search due to connection issues.")
else:
st.write("Enter a search query to begin.")
# Add some styling
st.markdown("""
<style>
.stApp {
max-width: 1200px;
margin: 0 auto;
}
.stExpander {
background-color: #f0f2f6;
border-radius: 5px;
margin-bottom: 10px;
}
</style>
""", unsafe_allow_html=True)
When we run the Streamlit client we can now test the keyword search capabilities of the BM25 index that we just created:
We can also now leverage this within the Elastic Playground to converse with the data, but to do that we also have to add the API key of the LLM we wish to use in the Elastic DashBoard. Playground supports models such as GPT-4o from OpenAI, Azure OpenAI, or Anthropic through Amazon Bedrock.
Any data can be used (even BM25-based indices). Data fields can optionally be transformed using text embedding models (such as Elastic's zero-shot semantic search model ELSER), but this is not a requirement.
Getting Started is very simple - we just select the indices we want to use to ground our answers and start asking questions.
Once we select our local_projects_index we can start to chat with our data set:
To quote Elastic, "While prototyping conversational search, the ability to rapidly iterate on and experiment with key components of a RAG workflow (for example: hybrid search, or adding reranking) are important— to get accurate and hallucination-free responses from LLMs."
Once you are happy with the results, this can be integrated into your own application. The 'View code' offers example application code for how to do this within your own API:
The institutional knowledge use case, the ability to chat against historic data, is a primary enterprise use, and the ability to prototype and experiment with search is important so the addition of Elastic Serverless and the Playground is very welcome.
Senior Solutions Architect at Elastic and creator of CardsthatTalk.com
6 个月Can we connect and talk about this we have a mutual customer that I would love to chat about. Jim Liddle