Real-Time Sentiment Analysis with TCP Socket, Apache Spark, OpenAI, Kafka and Elasticsearch | Data Enginering pipeline project
Introduction:
In this article, I shall share my experience implementing a real-time sentiment analysis project. Following and updating few little aspects of Yusuf Ganiyu article, I ventured into this intriguing project using Yelp's Customer Reviews Dataset ( 7 million records), Apache Spark, OpenAI, Kafka and Elasticsearch.
Project Overview:
The project's aim was to analyse opinions in real time using technologies like Apache Spark, Kafka, and Elasticsearch. The process began with setting up a Docker environment and installing necessary dependencies such as OpenAI, PySpark, and Confluent Kafka.
Project Development:
I suggest you to check the code simultaneously on my repo:
First, using Pycharm as IDE, we can create a Python env smoothly.
Second, creating a 'src' directory and configuring Docker with 'docker-compose.yml' and 'Dockerfile.spark'.
Essential Python packages like openai, pyspark, confluent_kafka, and fastavro (see assets/cmd_commands.txt) were installed.
The requirements.txt contains all the depencencies versions installed which is needed by the Dockerfile.spark.
Probably you are missing the config.py within my repo (due to gitignore), no worries, here you have it:
from dotenv import load_dotenv
import os
load_dotenv() # Carga las variables de entorno desde .env
config = {
"openai": {
"api_key": os.getenv("OPENAI_API_KEY")
},
"kafka": {
"sasl.username": os.getenv("KAFKA_USERNAME"),
"sasl.password": os.getenv("KAFKA_PASSWORD"),
"bootstrap.servers": os.getenv("KAFKA_SERVERS"),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'session.timeout.ms': 50000
},
"schema_registry": {
"url": "https://psrc-5j7x8.us-central1.gcp.confluent.cloud",
"basic.auth.user.info": os.getenv("SCHEMA_REGISTRY_USER") + ":" + os.getenv("SCHEMA_REGISTRY_PASSWORD")
}
}
Adapting to OpenAI Changes:
The original project used an earlier version of OpenAI. Due to changes in the OpenAI API, it was necessary to update the sentiment analysis code to use the gpt-3.5-turbo model. This involved modifying the sentiment_analysis method to suit the new API.
Docker Configuration:
The 'docker-compose.yml' was adjusted to expose port 9999, necessary for the socket within Docker.
Credential Management:
To safely handle credentials, a '.env' file was used and integrated into Docker, altering the Dockerfile and the config.py script.
At this point you should remember to install the proper dependencies to load environment variables in Docker Container:
cd src
docker exec -it spark-master /bin/bash
pip install pandas
pip install python-dotenv
Socket and Streaming Setup:
Start setting up a socket to transmit data in real time. This was done by opening a specific port (9999) in Docker, allowing communication between the container and the host.
Spark Streaming was used to process the data transmitted through the socket. This included configuring a StreamingContext and defining a schema to interpret the JSON data from the Yelp dataset.
cd src
docker exec -it spark-master /bin/bash
python jobs/streaming-socket.py
See in the image above, the code fragment where the spark stream df is configured to read from the socket and its schema as well.
In the image above you can see how the socket is sending records in streaming in batch of size 2.
Just highlighting, the functionality to resume sending messages from the last sent message instead of starting from the beginning is achieved through these key components (see streaming-socket.py)
last_sent_index = 0:
#Initializes an index to keep track of the number of messages already sent
# Skipping already sent lines
for _ in range(last_sent_index):
next(file)):
# Before reading new lines, the code skips over the lines that have been
# previously sent, ensuring that the reading starts right after the last
# sent line
领英推荐
# Updating last_sent_index after each send:
last_sent_index += 1
# The index is incremented as messages are sent, enabling the program to
# remember how many lines have been processed and resume from the correct
# position in the file during the next iteration.
Sentiment Analysis with OpenAI:
A sentiment analysis function was implemented using OpenAI's gpt-3.5-turbo model. This function takes a comment as input and returns a sentiment classification (positive, negative, neutral).
The integration of OpenAI was carried out via Python API, making calls to the chat.completions service to obtain the model's responses.
Data Processing and Storage:
The data processed by Spark Streaming were sent to Apache Kafka for queue management and then stored in Elasticsearch (indexed) for analysis and visualisation.
The data flow was configured to ensure efficient processing and smooth transmission from data capture to final storage.
Apache Kakfa cluster overview:
the topic: customers_review
To this topic, a schema registry need to be perfomed passing all the key-values in avro format.
{
"doc": "Schema for customer reviews",
"fields": [
{
"name": "review_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "business_id",
"type": "string"
},
{
"name": "stars",
"type": "float"
},
{
"name": "date",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "feedback",
"type": "string"
}
],
"name": "customers_review_schema",
"namespace": "com.rvm",
"type": "record"
}
the sink-Elasticsearch-connector advanced configurations:
ELASTIC SEARCH Deployment performance overview
Elastic Search Management dev tools (queries)
As soon as I change the datatype to the field Date from Stringtype to Timestamp, I will be able to query aggregation by date correctly.
Kibana dashboard made on the fly:
Things to improve:
-- Date data type need to be changed from Stringtype to TimeStamp in order to filter and visualize powerfull insights.
-- Tokenize and indexing in other way the field text.keyword, so a correct tag cloud could be added to this dashboard.
-- Of course, spending more time on creating relevant and insightfull visualizations in Kibana
To initiate the process, I ran several commands in the PyCharm terminal, starting with activating Docker and continuing with the execution of specific scripts for the socket and Spark Streaming.
Technologies and Tools Used:
Commands Used:
(For the complete list of commands, see assets/cmd_comands.txt).
Conclusions and Learnings:
This project was an excellent opportunity to learn about real-time data streaming and sentiment analysis. The challenges faced, such as adapting to the new version of OpenAI and configuring Docker, added valuable practical experience in solving problems in data engineering.
Acknowledgement:
This project not only replicates but also builds upon Yusuf's foundational work, providing an up-to-date, real-world application of data engineering techniques. Inspired by airscholar's RealtimeStreamingEngineering
Software Engineer at An-Najah National University Hospital (NNUH), AI/ML Master Student in NNU
1 周awesome, could I know what the tool u use to draw the pipeline sketch at the beginning ?
Java Development BCIT Mentor
10 个月Awesome. You're great