Generative AI StoreInsight Analytic Manager Using Python, Kafka, MinIO S3, OpenAI, PandasAI, Faust-Streaming & Streamlit UI. Retail Store Use-Case
Oluwaseyi Otun
Lead Backend Software Engineer ( Scala, Python, Java, Kafka, ZIO, Akka)
In recent years, the field of artificial intelligence has witnessed remarkable advancements, particularly in the realm of generative models most notably OpenAI, LLM, PandasAI, Google Geminis, LangChain, and Chatgpt.
Generative AI, a subset of artificial intelligence, focuses on creating new content, and code generations, whether it be images, text, music, or even entire virtual environments. The market size in the Artificial Intelligence market is projected to reach US$305.90 billion in 2024. It will continue to grow by over US$100 billion annually. It is predicted that the Artificial Intelligence market value will reach US$1 Trillion by 2030.
The Role of Generative AI in the Retail Industry for Data Exploration and Sales Analysis
What is Generative AI?
Generative AI refers to a subset of artificial intelligence (AI) that focuses on creating new content, such as images, text, audio, or even entire virtual environments. Unlike traditional AI models that are designed for specific tasks like classification or prediction, generative AI models are trained on large datasets to learn the underlying patterns and structures of the data. Once trained, these models can generate new content that resembles the data they were trained on. Generative AI encompasses various algorithms and architectures, each with its unique approach to content generation. Some of the most common types of generative AI models include:
This article delves into the fascinating world of generative AI, exploring its capabilities, applications, and technology to build an end-to-end Generative AI to provide data Insight into sales of Retail stores. We will touch on the benefits of building modern Generative AI for real-time data analytics their architecture, and logic.
To give more context, we will attempt to build a real-world use-case Generative AI data assistance called StoreInsight using various technology stacks Python, PandaAI, OpenAI, Kafka, MinIO S3 DataLake, Parquet, Faust Streaming, Streamlight UI, and Docker container.
Use Case
Cloud 9 Store is a fictitious retail store based in North America Canada, the US, and Mexico. In an era marked by digital innovation and evolving consumer preferences, retail giants are increasingly turning to cutting-edge technologies to stay ahead of the curve. Cloud 9 is poised to revolutionize its operations with the implementation of Generative AI.
Cloud 9's ambitious journey towards harnessing the power of Generative AI to gain valuable business intelligence insight, enhance customer experiences, optimize business processes, and drive growth in the competitive retail landscape.
Cloud 9 functional and Non-functional requirements
Below is the high-level architecture diagram of what we will build.
With the functional and non-functional requirements above we need to build a Generative AI StoreInsight data Assistance that can give me real-time business intelligence into our data. Let's get started.
Data Ingestion Layer
In the context of a GenAI Architecture, data ingestion plays a crucial role in the lifecycle of artificial intelligence (AI) models. The process of data ingestion involves collecting, preparing, and loading data from various sources into the system where AI models are developed and trained. Let's explore our data ingestion components
Kafka
Kafka is a popular choice as a data ingest component in many modern data architectures, including those focused on AI and machine learning. Kafka is a distributed streaming platform that provides capabilities for publishing, subscribing to, storing, and processing streams of records in real time.
Kafka is designed to handle high-throughput data ingestion from multiple sources concurrently. Its distributed architecture allows it to scale horizontally by adding more brokers to accommodate increasing data volumes.
We build a Python Kafka producer agent that ingests real-time sale event data from the POS machine.
below is the Python code snippet that does that job.
from gendata import generate_sale_event, kafka_producer, logger, kafka_topic
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
interval_time = float(os.getenv('SCHEDULE_INTERVAL'))
@tl.job(interval=timedelta(seconds=interval_time))
def send_sales_event():
try:
sale_event = generate_sale_event()
message_key = sale_event.saleId
sale_event_data = json.dumps(sale_event.model_dump(), indent=4)
logger.info(f'Message Key {message_key}')
logger.info(f'Sale Event {sale_event_data}')
kafka_producer.produce(kafka_topic, str(sale_event_data), message_key)
kafka_producer.flush()
logger.info(f'Successfully sent sale event to Kafka topic {kafka_topic}')
except Exception as err:
logger.error(str(err))
traceback.print_tb(err.__traceback__)
if __name__ == "__main__":
tl.start(block=True)
Let's quickly analyze the code above
Data Transformation & Ingestion
Faust for streaming data, Pandas for transformation, and Pyrarrow Parquet for ingestion to MinIO is a powerful combination for building scalable, real-time data processing pipelines. Faust is a stream processing library for Python that is designed to handle high-throughput, low-latency data processing tasks, while MinIO is a high-performance, distributed object storage system S3 data lake. The sales event data are stored in Parquet format. Apache Parquet is an open-source, column-oriented data file format designed for efficient data storage and retrieval.
Below is the Python code snippet that does that job
领英推荐
app = faust.App('DailySale-Events', broker=kafka_broker, topic_partitions=no_topic_partitions)
sales_event_topic = app.topic(sale_event_topic, key_type=str, value_type=SaleEvent)
@app.agent(sales_event_topic)
async def process_daily_sales_events(streams: Stream[SaleEvent]):
async for events in streams.take(no_of_poll, poll_time_out):
save_daily_sale_events_to_minio_datalake(events)
def save_daily_sale_events_to_minio_datalake(sale_events: list[SaleEvent]):
try:
tuple_data = list(map(lambda event: (event.storeCode, event.productCode, event.productName, event.quantity,
event.amount, event.categoryCode, event.category, event.city,
event.postalCode, event.storeLocation, event.province,
event.transactionDate)
, sale_events))
daily_sale_event_df = pd.DataFrame(tuple_data,
columns=['storeCode', 'productCode', 'productName', 'quantity', 'amount',
'categoryCode', 'category', 'city', 'postalCode', 'storeLocation',
'province',
'transactionDate'])
daily_sale_event_df['transactionDate'] = pd.to_datetime(daily_sale_event_df['transactionDate'])
daily_sale_event_df['year'] = daily_sale_event_df['transactionDate'].dt.year
daily_sale_event_df['month'] = daily_sale_event_df['transactionDate'].dt.month
daily_sale_event_df['day'] = daily_sale_event_df['transactionDate'].dt.day
logger.info(f'Total sale events dataframe size {daily_sale_event_df.shape[0]}')
table = pa.Table.from_pandas(daily_sale_event_df)
minio_bucket_path = f's3://{mino_bucket}/daily_sale_event'
parquet.write_to_dataset(table=table, root_path=minio_bucket_path, filesystem=s3fs,
partition_cols=['year', 'month'],
existing_data_behavior='overwrite_or_ignore')
logger.info(f"Successfully saved {len(sale_events)} daily sales event in Minio S3 Object Storage ")
except Exception as e:
logger.error(e, exc_info=True)
if __name__ == "__main__":
app.main()
Let's quickly analyze the code above
Generative AI StoreInsight Data Assistant.
After acquiring and ingesting the data into our data lake the focus shifts towards leveraging generative AI techniques to derive insights, create new content, or perform various tasks based on the input data. Streamlit and PandasAI are both tools commonly used in the field of data science and machine learning and interactive AI Bot assistance can generate responses from the input from prompts.
Streamlit: Streamlit is an open-source Python library that makes it easy to create web applications for machine learning and data science projects. It allows developers to build interactive and customizable web apps directly from Python scripts, without requiring knowledge of web development languages like HTML, CSS, or JavaScript. Streamlit is particularly popular for quickly prototyping and sharing data-driven applications, as it provides a simple and intuitive way to create interactive user interfaces.
Learn more about Strimlit UI in the link below.
PandasAI: is open source Python library that adds Generative AI capabilities to pandas, the popular data analysis and manipulation tool. It is designed to be used in conjunction with pandas and is not a replacement for it. We can ask data-related questions in natural language and it gives us answers. i.e. we can speak to our data in plain English by empowering non-technical people like business managers who want to gain insight into our data in real-time.
Learn more about PandasAI in the link below
Below is the final POC of Cloud 9 StoreInght Gen AI Data Assistance built with a combination of of Streamlt UI, PandasAI, OpenAI, Kafka, MinIO data lake, and Pandas.
Bellow the code snippets for Streamlit and PandaAI data assistance application
import streamlit as st
from co_pilot_store_manager import logger
from co_pilot_store_manager.co_pilot import MinIODataLakeConnector
from co_pilot_store_manager import openai_api_key
from co_pilot_store_manager import generate_select_options
import pandas as pd
import streamlit as st
import sys
import shelve
from dataclasses import dataclass
from pandasai.llm.openai import OpenAI
from pandasai.agent import SmartDatalake
USER_AVATAR = "??"
BOT_AVATAR = "??"
happy = "??"
THINKING_FACE= "??"
minioDataLakeConnector = MinIODataLakeConnector()
llm = OpenAI(api_token="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")
select_option_list = generate_select_options()
def initialize_session_state():
st.session_state['df'] = None
def load_data_frame_from_minio_data_lake(year: int, month: int) -> pd.DataFrame:
return minioDataLakeConnector.load_data_frame(year,month)
def generate_pandas_ai_response(df: pd.DataFrame, prompt_message):
smartDataLake_ai = SmartDatalake(dfs=[df],config={"llm": llm})
print(prompt_message)
response = smartDataLake_ai.chat(prompt_message)
return response
Let's quickly analyze the code above
The full source code of the Streamlit UI components is in the given repository below?
Takeaways
In delving into the world of Generative AI with PandasAI OpenAI, leveraging Kafka for efficient data ingestion, and harnessing the power of MiniO as a robust data lake solution, we've embarked on a journey of innovation and discovery. Through this exploration, we've learned invaluable lessons about the intersection of artificial intelligence, data management, and creative expression.
PandasAI is designed to bridge the gap between traditional data analysis workflows and the realm of artificial intelligence. By combining the strengths of Pandas and Generative AI, PandasAI empowers users to engage in natural language conversations with their data. This innovative library brings a new level of interactivity and flexibility to the data analysis process.
Kafka as a data ingestion platform has underscored the importance of seamless data pipelines in powering AI-driven applications. Kafka's ability to handle high-throughput data streams with reliability and scalability has been instrumental in facilitating the flow of information to our AI models, enabling real-time insights and decision-making.
MiniO as a data lake has highlighted the critical role of robust data management infrastructure in supporting AI initiatives. With MiniO's scalable object storage capabilities and advanced data management features, we've been able to effectively store, organize, and analyze vast quantities of data, laying the foundation for AI-driven innovation and discovery.
Finally using Streamlit has further demonstrated the power of intuitive user interfaces in democratizing access to AI-powered tools and applications. With its user-friendly design and interactive features, Streamlit empowers users of all backgrounds to engage with complex AI models and datasets, fostering collaboration and innovation across disciplines.
In essence, our journey with Generative AI, PandasAI, Kafka, MiniO, and Streamlit has taught us that the convergence of advanced technologies holds immense promise for reshaping how we create, consume, and interact with content. By embracing these tools and technologies, we have the opportunity to unlock new realms of possibility and push the boundaries of what's possible in the realms of creativity, innovation, and beyond.
Thank you for reading.
Oluwaseyi Otun is an Independent IT Consultant based in Canada focused on AI data engineering, data science, machine learning, data ingestion, and backend & system design (Python, Scala, ZIO, Akka, Kafka, Java). Distributed system enthusiast and passionate about AI and special interest in pure functional programming Scala (FP) & Python, Mathematics & Statistics, software architecture, design patterns, microservice, and clean and standard code quality.
I have good domain Knowledge in Fintech, Financial Services, and Health Care solutions. I love internals working on a system, and exploring what happens under the cover.
In recent times I have moved back to my first love Mathematics & Statistics as they play a crucial role in any AI applications, computer science, and IT in general. I can be reached at my email [email protected].