Generative AI StoreInsight Analytic Manager Using Python, Kafka, MinIO S3, OpenAI, PandasAI, Faust-Streaming & Streamlit UI. Retail Store Use-Case

Generative AI StoreInsight Analytic Manager Using Python, Kafka, MinIO S3, OpenAI, PandasAI, Faust-Streaming & Streamlit UI. Retail Store Use-Case

In recent years, the field of artificial intelligence has witnessed remarkable advancements, particularly in the realm of generative models most notably OpenAI, LLM, PandasAI, Google Geminis, LangChain, and Chatgpt.

Generative AI, a subset of artificial intelligence, focuses on creating new content, and code generations, whether it be images, text, music, or even entire virtual environments. The market size in the Artificial Intelligence market is projected to reach US$305.90 billion in 2024. It will continue to grow by over US$100 billion annually. It is predicted that the Artificial Intelligence market value will reach US$1 Trillion by 2030.

The Role of Generative AI in the Retail Industry for Data Exploration and Sales Analysis

  • Data Analysis Efficiency: Generative AI algorithms can efficiently analyze large volumes of retail data, including sales transactions, customer demographics, product attributes, and more. By automating data analysis tasks, Generative AI enables retailers to gain deeper insights into daily sales events and identify patterns, trends, and anomalies in the data more quickly and accurately than traditional methods.
  • Real-Time Insights: Generative AI can provide real-time insights into daily sales events, enabling retailers to monitor performance metrics, track key performance indicators (KPIs), and make data-driven decisions on the fly. By visualizing sales data through interactive dashboards and reports, retailers can quickly identify trends, opportunities, and areas for improvement, empowering them to take timely actions to optimize sales performance and drive business growth.
  • Optimized Inventory Management: Generative AI can optimize inventory management by analyzing sales data, stock levels, lead times, and other factors to predict demand for products accurately. By generating inventory replenishment plans and allocation strategies, retailers can ensure that the right products are available in the right quantities at the right locations to meet customer demand while minimizing carrying costs and stockouts.
  • Optimized Inventory Management: Generative AI can optimize inventory management by analyzing sales data, stock levels, lead times, and other factors to predict demand for products accurately. By generating inventory replenishment plans and allocation strategies, retailers can ensure that the right products are available in the right quantities at the right locations to meet customer demand while minimizing carrying costs and stockouts.


What is Generative AI?

Generative AI refers to a subset of artificial intelligence (AI) that focuses on creating new content, such as images, text, audio, or even entire virtual environments. Unlike traditional AI models that are designed for specific tasks like classification or prediction, generative AI models are trained on large datasets to learn the underlying patterns and structures of the data. Once trained, these models can generate new content that resembles the data they were trained on. Generative AI encompasses various algorithms and architectures, each with its unique approach to content generation. Some of the most common types of generative AI models include:


  • Transformers: Transformers are a type of deep learning architecture originally developed for natural language processing tasks. Models like OpenAI's GPT (Generative Pre-trained Transformer) series can generate coherent and contextually relevant text based on a given prompt. Transformers have also been applied to tasks such as image generation and music composition.
  • Generative Adversarial Networks (GANs): GANs consist of two neural networks – a generator and a discriminator – that are trained simultaneously in a competitive manner. The generator creates synthetic data, while the discriminator distinguishes between real and fake data. Through this adversarial training process, GANs can produce highly realistic images, videos, and text.

This article delves into the fascinating world of generative AI, exploring its capabilities, applications, and technology to build an end-to-end Generative AI to provide data Insight into sales of Retail stores. We will touch on the benefits of building modern Generative AI for real-time data analytics their architecture, and logic.

To give more context, we will attempt to build a real-world use-case Generative AI data assistance called StoreInsight using various technology stacks Python, PandaAI, OpenAI, Kafka, MinIO S3 DataLake, Parquet, Faust Streaming, Streamlight UI, and Docker container.

Use Case

Cloud 9 Store is a fictitious retail store based in North America Canada, the US, and Mexico. In an era marked by digital innovation and evolving consumer preferences, retail giants are increasingly turning to cutting-edge technologies to stay ahead of the curve. Cloud 9 is poised to revolutionize its operations with the implementation of Generative AI.

Cloud 9's ambitious journey towards harnessing the power of Generative AI to gain valuable business intelligence insight, enhance customer experiences, optimize business processes, and drive growth in the competitive retail landscape.


Cloud 9 functional and Non-functional requirements

  • Generative AI algorithms should analyze historical sales data, market trends, and seasonal factors to optimize inventory management decisions, such as stock replenishment, pricing strategies, and product assortment planning
  • Generative AI models should be able to scale horizontally across multiple servers or cloud instances to handle increased computational demand.
  • The system should be capable of processing large volumes of data efficiently to generate accurate recommendations and insights in real-time.


Below is the high-level architecture diagram of what we will build.


With the functional and non-functional requirements above we need to build a Generative AI StoreInsight data Assistance that can give me real-time business intelligence into our data. Let's get started.


Data Ingestion Layer

In the context of a GenAI Architecture, data ingestion plays a crucial role in the lifecycle of artificial intelligence (AI) models. The process of data ingestion involves collecting, preparing, and loading data from various sources into the system where AI models are developed and trained. Let's explore our data ingestion components

Kafka

Kafka is a popular choice as a data ingest component in many modern data architectures, including those focused on AI and machine learning. Kafka is a distributed streaming platform that provides capabilities for publishing, subscribing to, storing, and processing streams of records in real time.

Kafka is designed to handle high-throughput data ingestion from multiple sources concurrently. Its distributed architecture allows it to scale horizontally by adding more brokers to accommodate increasing data volumes.

We build a Python Kafka producer agent that ingests real-time sale event data from the POS machine.

below is the Python code snippet that does that job.


from gendata import generate_sale_event, kafka_producer, logger, kafka_topic
from timeloop import Timeloop
from datetime import timedelta

tl = Timeloop()

interval_time = float(os.getenv('SCHEDULE_INTERVAL'))


@tl.job(interval=timedelta(seconds=interval_time))
def send_sales_event():
    try:
        sale_event = generate_sale_event()
        message_key = sale_event.saleId
        sale_event_data = json.dumps(sale_event.model_dump(), indent=4)
        logger.info(f'Message Key {message_key}')
        logger.info(f'Sale Event {sale_event_data}')
        kafka_producer.produce(kafka_topic, str(sale_event_data), message_key)
        kafka_producer.flush()
        logger.info(f'Successfully sent sale event to Kafka topic {kafka_topic}')
    except Exception as err:
        logger.error(str(err))
        traceback.print_tb(err.__traceback__)


if __name__ == "__main__":
    tl.start(block=True)
        

Let's quickly analyze the code above

  1. It generates synthetic sale event data
  2. Convert the sale event data model to JSON
  3. It sends the Sale event JSON data format to Kafka using the producer client API
  4. It flushed the data to the Kafka broker


Data Transformation & Ingestion

Faust for streaming data, Pandas for transformation, and Pyrarrow Parquet for ingestion to MinIO is a powerful combination for building scalable, real-time data processing pipelines. Faust is a stream processing library for Python that is designed to handle high-throughput, low-latency data processing tasks, while MinIO is a high-performance, distributed object storage system S3 data lake. The sales event data are stored in Parquet format. Apache Parquet is an open-source, column-oriented data file format designed for efficient data storage and retrieval.

Below is the Python code snippet that does that job


app = faust.App('DailySale-Events', broker=kafka_broker, topic_partitions=no_topic_partitions)

sales_event_topic = app.topic(sale_event_topic, key_type=str, value_type=SaleEvent)


@app.agent(sales_event_topic)
async def process_daily_sales_events(streams: Stream[SaleEvent]):
    async for events in streams.take(no_of_poll, poll_time_out):
        save_daily_sale_events_to_minio_datalake(events)


def save_daily_sale_events_to_minio_datalake(sale_events: list[SaleEvent]):
    try:

        tuple_data = list(map(lambda event: (event.storeCode, event.productCode, event.productName, event.quantity,
                                             event.amount, event.categoryCode, event.category, event.city,
                                             event.postalCode, event.storeLocation, event.province,
                                             event.transactionDate)
                              , sale_events))

        daily_sale_event_df = pd.DataFrame(tuple_data,
                                           columns=['storeCode', 'productCode', 'productName', 'quantity', 'amount',
                                                    'categoryCode', 'category', 'city', 'postalCode', 'storeLocation',
                                                    'province',
                                                    'transactionDate'])
        daily_sale_event_df['transactionDate'] = pd.to_datetime(daily_sale_event_df['transactionDate'])
        daily_sale_event_df['year'] = daily_sale_event_df['transactionDate'].dt.year
        daily_sale_event_df['month'] = daily_sale_event_df['transactionDate'].dt.month
        daily_sale_event_df['day'] = daily_sale_event_df['transactionDate'].dt.day
        logger.info(f'Total sale events dataframe size {daily_sale_event_df.shape[0]}')
        table = pa.Table.from_pandas(daily_sale_event_df)
        minio_bucket_path = f's3://{mino_bucket}/daily_sale_event'
        parquet.write_to_dataset(table=table, root_path=minio_bucket_path, filesystem=s3fs,
                                 partition_cols=['year', 'month'],
                                 existing_data_behavior='overwrite_or_ignore')
        logger.info(f"Successfully saved  {len(sale_events)}  daily sales event in Minio S3 Object Storage ")

    except Exception as e:
        logger.error(e, exc_info=True)


if __name__ == "__main__":
    app.main()
        

Let's quickly analyze the code above

  1. It streams sales events from Kafka topic at intervals by polling specific numbers for events periodically
  2. It creates (values, column) tuples from the list of events using lambda
  3. It creates a Pandas data frame from these tuples
  4. It transforms the transaction date column by creating Year, Month, Day
  5. It creates a table from the Pandas data frame
  6. It creates a partition table using the year and month columns.
  7. It writes the table as a parquet using a highly performant Pyarrow library
  8. It stores the table in the MinIO s3 bucket. The performance of this write is super fast it takes a few milliseconds to write over 200K records. Below is the admin console of the parquet table in the MinIO 33 bucket


Generative AI StoreInsight Data Assistant.

After acquiring and ingesting the data into our data lake the focus shifts towards leveraging generative AI techniques to derive insights, create new content, or perform various tasks based on the input data. Streamlit and PandasAI are both tools commonly used in the field of data science and machine learning and interactive AI Bot assistance can generate responses from the input from prompts.

Streamlit: Streamlit is an open-source Python library that makes it easy to create web applications for machine learning and data science projects. It allows developers to build interactive and customizable web apps directly from Python scripts, without requiring knowledge of web development languages like HTML, CSS, or JavaScript. Streamlit is particularly popular for quickly prototyping and sharing data-driven applications, as it provides a simple and intuitive way to create interactive user interfaces.

Learn more about Strimlit UI in the link below.


PandasAI: is open source Python library that adds Generative AI capabilities to pandas, the popular data analysis and manipulation tool. It is designed to be used in conjunction with pandas and is not a replacement for it. We can ask data-related questions in natural language and it gives us answers. i.e. we can speak to our data in plain English by empowering non-technical people like business managers who want to gain insight into our data in real-time.

Learn more about PandasAI in the link below


Below is the final POC of Cloud 9 StoreInght Gen AI Data Assistance built with a combination of of Streamlt UI, PandasAI, OpenAI, Kafka, MinIO data lake, and Pandas.


Bellow the code snippets for Streamlit and PandaAI data assistance application

import streamlit as st
from co_pilot_store_manager import logger
from co_pilot_store_manager.co_pilot import MinIODataLakeConnector
from co_pilot_store_manager import openai_api_key
from co_pilot_store_manager import generate_select_options
import pandas as pd
import streamlit as st 
import sys
import shelve
from dataclasses import dataclass
from pandasai.llm.openai import OpenAI
from pandasai.agent import SmartDatalake

USER_AVATAR = "??"
BOT_AVATAR = "??"
happy = "??"
THINKING_FACE= "??"


minioDataLakeConnector = MinIODataLakeConnector()

llm = OpenAI(api_token="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")


select_option_list = generate_select_options()

def initialize_session_state():
    st.session_state['df'] = None

def load_data_frame_from_minio_data_lake(year: int, month: int) -> pd.DataFrame:
    return  minioDataLakeConnector.load_data_frame(year,month)


def generate_pandas_ai_response(df: pd.DataFrame, prompt_message): 
    smartDataLake_ai = SmartDatalake(dfs=[df],config={"llm": llm})
    print(prompt_message)
    response = smartDataLake_ai.chat(prompt_message)
    return response
   

        

Let's quickly analyze the code above

  1. We initialize our OpenAI model?
  2. We load our data as data frames from the MinIO data lake using partition columns year and month.
  3. We create a PandaAI SmartDatalake object with an LLM Open AI model and data frame.


The full source code of the Streamlit UI components is in the given repository below?


Takeaways

In delving into the world of Generative AI with PandasAI OpenAI, leveraging Kafka for efficient data ingestion, and harnessing the power of MiniO as a robust data lake solution, we've embarked on a journey of innovation and discovery. Through this exploration, we've learned invaluable lessons about the intersection of artificial intelligence, data management, and creative expression.

PandasAI is designed to bridge the gap between traditional data analysis workflows and the realm of artificial intelligence. By combining the strengths of Pandas and Generative AI, PandasAI empowers users to engage in natural language conversations with their data. This innovative library brings a new level of interactivity and flexibility to the data analysis process.

Kafka as a data ingestion platform has underscored the importance of seamless data pipelines in powering AI-driven applications. Kafka's ability to handle high-throughput data streams with reliability and scalability has been instrumental in facilitating the flow of information to our AI models, enabling real-time insights and decision-making.

MiniO as a data lake has highlighted the critical role of robust data management infrastructure in supporting AI initiatives. With MiniO's scalable object storage capabilities and advanced data management features, we've been able to effectively store, organize, and analyze vast quantities of data, laying the foundation for AI-driven innovation and discovery.

Finally using Streamlit has further demonstrated the power of intuitive user interfaces in democratizing access to AI-powered tools and applications. With its user-friendly design and interactive features, Streamlit empowers users of all backgrounds to engage with complex AI models and datasets, fostering collaboration and innovation across disciplines.

In essence, our journey with Generative AI, PandasAI, Kafka, MiniO, and Streamlit has taught us that the convergence of advanced technologies holds immense promise for reshaping how we create, consume, and interact with content. By embracing these tools and technologies, we have the opportunity to unlock new realms of possibility and push the boundaries of what's possible in the realms of creativity, innovation, and beyond.

Thank you for reading.


Oluwaseyi Otun is an Independent IT Consultant based in Canada focused on AI data engineering, data science, machine learning, data ingestion, and backend & system design (Python, Scala, ZIO, Akka, Kafka, Java). Distributed system enthusiast and passionate about AI and special interest in pure functional programming Scala (FP) & Python, Mathematics & Statistics, software architecture, design patterns, microservice, and clean and standard code quality.

I have good domain Knowledge in Fintech, Financial Services, and Health Care solutions. I love internals working on a system, and exploring what happens under the cover.

In recent times I have moved back to my first love Mathematics & Statistics as they play a crucial role in any AI applications, computer science, and IT in general. I can be reached at my email [email protected].


要查看或添加评论,请登录

Oluwaseyi Otun的更多文章

社区洞察

其他会员也浏览了