Real-Time Sentiment Analysis with TCP Socket, Apache Spark, OpenAI, Kafka and Elasticsearch | Data Enginering pipeline project
A real-time data streaming pipeline with a dataset of 7 million records based on Python

Real-Time Sentiment Analysis with TCP Socket, Apache Spark, OpenAI, Kafka and Elasticsearch | Data Enginering pipeline project

Introduction:

In this article, I shall share my experience implementing a real-time sentiment analysis project. Following and updating few little aspects of Yusuf Ganiyu article, I ventured into this intriguing project using Yelp's Customer Reviews Dataset ( 7 million records), Apache Spark, OpenAI, Kafka and Elasticsearch.


Project Overview:

The project's aim was to analyse opinions in real time using technologies like Apache Spark, Kafka, and Elasticsearch. The process began with setting up a Docker environment and installing necessary dependencies such as OpenAI, PySpark, and Confluent Kafka.

Project Development:

I suggest you to check the code simultaneously on my repo:

https://github.com/Rafavermar/SparkStreaming


  • Initial Setup:

First, using Pycharm as IDE, we can create a Python env smoothly.

Second, creating a 'src' directory and configuring Docker with 'docker-compose.yml' and 'Dockerfile.spark'.

Essential Python packages like openai, pyspark, confluent_kafka, and fastavro (see assets/cmd_commands.txt) were installed.


The requirements.txt contains all the depencencies versions installed which is needed by the Dockerfile.spark.

Project's directory structure


Probably you are missing the config.py within my repo (due to gitignore), no worries, here you have it:

from dotenv import load_dotenv
import os

load_dotenv()  # Carga las variables de entorno desde .env

config = {
    "openai": {
        "api_key": os.getenv("OPENAI_API_KEY")
    },
    "kafka": {
        "sasl.username": os.getenv("KAFKA_USERNAME"),
        "sasl.password": os.getenv("KAFKA_PASSWORD"),
        "bootstrap.servers": os.getenv("KAFKA_SERVERS"),
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'PLAIN',
        'session.timeout.ms': 50000
    },
    "schema_registry": {
        "url": "https://psrc-5j7x8.us-central1.gcp.confluent.cloud",
        "basic.auth.user.info": os.getenv("SCHEMA_REGISTRY_USER") + ":" + os.getenv("SCHEMA_REGISTRY_PASSWORD")
    }
}        
Docker desktop view showing the Spark docker containers running and up.


  • Challenges and Solutions:

Adapting to OpenAI Changes:

The original project used an earlier version of OpenAI. Due to changes in the OpenAI API, it was necessary to update the sentiment analysis code to use the gpt-3.5-turbo model. This involved modifying the sentiment_analysis method to suit the new API.


Docker Configuration:

The 'docker-compose.yml' was adjusted to expose port 9999, necessary for the socket within Docker.


Credential Management:

To safely handle credentials, a '.env' file was used and integrated into Docker, altering the Dockerfile and the config.py script.

At this point you should remember to install the proper dependencies to load environment variables in Docker Container:

cd src
docker exec -it spark-master /bin/bash
pip install pandas
pip install python-dotenv
        

  • Implementation and Execution:

Socket and Streaming Setup:

Start setting up a socket to transmit data in real time. This was done by opening a specific port (9999) in Docker, allowing communication between the container and the host.

Spark Streaming was used to process the data transmitted through the socket. This included configuring a StreamingContext and defining a schema to interpret the JSON data from the Yelp dataset.

cd src
 docker exec -it spark-master /bin/bash
 python jobs/streaming-socket.py        
On the command line, the execution of streaming-socket py


See in the image above, the code fragment where the spark stream df is configured to read from the socket and its schema as well.

command line output for streaming-socket py


In the image above you can see how the socket is sending records in streaming in batch of size 2.


Shows on the command line, the execution of spark-streamig py


command line output [continue] from the execution of spark-streamig py



Spark UI. Shows the master Running and only 1 worker alive as was set


Just highlighting, the functionality to resume sending messages from the last sent message instead of starting from the beginning is achieved through these key components (see streaming-socket.py)

  last_sent_index = 0: 
  #Initializes an index to keep track of the number of messages already sent        
# Skipping already sent lines

for _ in range(last_sent_index):     
next(file)): 

# Before reading new lines, the code skips over the lines that have been
# previously sent, ensuring that the reading starts right after the last 
# sent line        
# Updating last_sent_index after each send: 

last_sent_index += 1

# The index is incremented as messages are sent, enabling the program to 
# remember how many lines have been processed and resume from the correct 
# position in the file during the next iteration.        


Sentiment Analysis with OpenAI:

A sentiment analysis function was implemented using OpenAI's gpt-3.5-turbo model. This function takes a comment as input and returns a sentiment classification (positive, negative, neutral).

The integration of OpenAI was carried out via Python API, making calls to the chat.completions service to obtain the model's responses.

Code fragment to set the communication and prompting forth and back to OpenAI-Spark


OpenAI platform API Keys management


Data Processing and Storage:

The data processed by Spark Streaming were sent to Apache Kafka for queue management and then stored in Elasticsearch (indexed) for analysis and visualisation.

The data flow was configured to ensure efficient processing and smooth transmission from data capture to final storage.

Apache Kakfa cluster overview:

Confluent Kafka cluster overview


the topic: customers_review

Kafka topic Messages metrics overview


To this topic, a schema registry need to be perfomed passing all the key-values in avro format.

{
  "doc": "Schema for customer reviews",
  "fields": [
    {
      "name": "review_id",
      "type": "string"
    },
    {
      "name": "user_id",
      "type": "string"
    },
    {
      "name": "business_id",
      "type": "string"
    },
    {
      "name": "stars",
      "type": "float"
    },
    {
      "name": "date",
      "type": "string"
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "feedback",
      "type": "string"
    }
  ],
  "name": "customers_review_schema",
  "namespace": "com.rvm",
  "type": "record"
}        

the sink-Elasticsearch-connector advanced configurations:

sink connector to Elasticsearch main metrics overview


sink connector to Elasticsearch advanced configuration overview


ELASTIC SEARCH Deployment performance overview

Elasticsearch deployment performance. Main metrics


Elastic Search Management dev tools (queries)

performing queries to the index storage. Count aggregation by feeback


As soon as I change the datatype to the field Date from Stringtype to Timestamp, I will be able to query aggregation by date correctly.


Kibana dashboard made on the fly:

Things to improve:

-- Date data type need to be changed from Stringtype to TimeStamp in order to filter and visualize powerfull insights.

-- Tokenize and indexing in other way the field text.keyword, so a correct tag cloud could be added to this dashboard.

-- Of course, spending more time on creating relevant and insightfull visualizations in Kibana


  • Project Execution:

To initiate the process, I ran several commands in the PyCharm terminal, starting with activating Docker and continuing with the execution of specific scripts for the socket and Spark Streaming.

Technologies and Tools Used:

  • Languages and Frameworks: Python, PySpark, OpenAI.
  • Platforms and Tools: Docker, Apache Kafka, Apache Spark, Elasticsearch-Kibana
  • Datasets: Yelp Customer Reviews Dataset.


Commands Used:

(For the complete list of commands, see assets/cmd_comands.txt).


Conclusions and Learnings:

This project was an excellent opportunity to learn about real-time data streaming and sentiment analysis. The challenges faced, such as adapting to the new version of OpenAI and configuring Docker, added valuable practical experience in solving problems in data engineering.


Acknowledgement:

This project not only replicates but also builds upon Yusuf's foundational work, providing an up-to-date, real-world application of data engineering techniques. Inspired by airscholar's RealtimeStreamingEngineering



Israa Salameh

Software Engineer at An-Najah National University Hospital (NNUH), AI/ML Master Student in NNU

1 周

awesome, could I know what the tool u use to draw the pipeline sketch at the beginning ?

回复
Jesus Javier Puente Sánchez

Java Development BCIT Mentor

10 个月

Awesome. You're great

要查看或添加评论,请登录

社区洞察

其他会员也浏览了