Streaming analysis with Kafka, InfluxDB and Grafana
Valentina Alto
AI and App Innovation Technical Architect at Microsoft | Tech Author | MSc in Data Science
If you are dealing with the streaming analysis of your data, there are some tools which can offer performing and easy-to-interpret results. First, we have Kafka, which is a distributed streaming platform which allows its users to send and receive live messages containing a bunch of data (you can read more about it here). We will use it as our streaming environment. Then, if we want to visualize, in real time, our results, we need a tool which can capture our data and predictions: it is Grafana, and among its data sources, it can be connected to InfluxDB, an open source time series database. So, through this article we will build an ML algorithm which can extract information and make predictions, in real time, on our data, throughout the following steps:
- Loading our data and preparing them to be processed
- Building the algorithm and save it to spark environment
- Writing a script which reads our data and sends them to a Kafka topic as it receives them, so that we are simulating a streaming gathering
- Writing a script which collects data from Kafka topic and processes them through the algorithm we trained and saved. Then, it sends the results to another Kafka topic
- Finally, writing a third script which reads from the second topic and sends the message directly to InfluxDB, so that we can plot our results on Grafana.
So let's start. The task I want to simulate is the following: imagine we are provided with some data about the temperature of a working machine. We know that this machine works on different regimes, which correspond to different temperature levels and fluctuations. Hence, we first want to identify those work regimes (which are unknown), then train a classification algorithm which can make predictions on real-time data and predict, based on the actual temperature, the correspondent working regime. As you might have guessed, we are facing an unsupervised task which we are converting into a supervised one.
First, I'm going to create a random dataset containing my temperature variable. To code these lines (as well as those which will be following), I've used Apache Zeppelin with the %pyspark interpreter.
%pyspark
import pandas as pd
import numpy as np
df = pd.DataFrame({'Temperature': np.random.randint(40,70, 50000)})
Then, I converted it into a spark dataframe, so that we can connect it to our Spark environment, through a Spark Context I've initialized:
%pyspark
from pyspark import SparkContext
sc = SparkContext("local[*]", "Example")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
spark_df = sqlContext.createDataFrame(df)
Then, in order for our dataset to be correctly processed by our algorithm, we have to vectorize it. The idea is that you want to convert the columns you will use as features to a vector of values, stored in a new column called, of course, 'Feature'. Note that here we will use only Temperature as a feature, however the format needs to be a vector anyway:
%pyspark
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Temperature'], outputCol = 'Feature')
vspark_df = vectorAssembler.transform(spark_df)
vspark_df.show()
As you can see, the format of our variable changed to a vector type. Now our feature is ready to be processed.
The first thing we want to do, since we are dealing with unlabelled data, is clustering them with an unsupervised algorithm. For this purpose, I will use K-means (you can read my article here if you want to know something more about this algorithm).
Hence, I will start identifying the number of centroids I want to set with the Elbow method:
%pyspark
import matplotlib.pyplot as plt
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("Feature")
model = kmeans.fit(vspark_df.sample(False,0.1, seed=42))
cost[k] = model.computeCost(vspark_df)
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')
It seems that, after the sixth centroid, the within-cluster variance is not decreasing significantly. Hence, we can set K=6. Let's apply our K-means to our dataset:
%pyspark
from pyspark.ml.clustering import KMeans
k = 6
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("Feature")
model = kmeans.fit(vspark_df)
from pyspark.sql.functions import col
df_transformed=model.transform(vspark_df).select('Temperature','Feature', col('prediction').alias('label'))
df_transformed.show(2)
As you can see, we now have a third column, which is our target, our label. Hence, we can now train a classification algorithm in a supervised way and, more specifically, we will train a Neural Network with 2 hidden layers (with input=1 since we have only one feature, and output=6 since we have 6 clusters).
%pyspark
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Split the data into train and test
splits = df_transformed.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
layers = [1, 6, 8, 6]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(featuresCol='Feature', labelCol='label', maxIter=200, layers=layers, blockSize=128, seed=1234)
# train the model
model_NN = trainer.fit(train)
# compute accuracy on the test set
result = model_NN.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
Output: Test set accuracy = 0.802813400934
The accuracy of our NN is around 80%, so we are satisfied and we can save our model like so:
%pyspark
from pyspark.ml.classification import MultilayerPerceptronClassificationModel
model_NN.save("file:///home/cloudera/Desktop/Neural_Network")
To save our dataset too, we first need to parse it to a json format (the key-value type):
%pyspark
from pyspark.sql import functions as F
json_file=final_df.select(F.to_json(F.struct([final_df[x] for x in final_df.columns])).alias("value"))
json_file.show()
Nice. Now let's move to our Pyspark scripts. First, I'm writing the program which will collect data from our mydata.json and send them, as in streaming, to our Kafka consumer, under the topic 'example_1'.
#! /usr/bin/env python2.7
import json
from pprint import pprint
from time import sleep
from json import dumps
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient('quickstart.cloudera:9092')
producer = SimpleProducer(kafka)
with open('mydata.json', 'r') as f:
for line in f:
data = json.loads(line)
for val in data.values():
producer.send_messages('example_1', val.encode('utf-8'))
sleep(5) #it will send a line to kafka every 5 seconds
Now, I'm writing a script which reads data from Kafka topic 'example_1'. To do that, I'm using the Spark Structured Streaming approach, very useful if you want your streaming flow to be structured rather than unstructured (which is the typical format streaming data are collected through a Spark Streaming Context). Here there is the full guide of Spark Structured Streaming with Kafka Integration.
Then, once collected my data, I'm using my stored ML model to extract the work regime from my feature Temperature, and then collect the output in a new column called 'prediction'.
Finally, I will write my final dataframe to Kafka again, but in a different topic called 'example_2' (indeed, 'example_1' is already taken by the queue of my initial dataframe's rows).
#! /usr/bin/env python2.7
#initializing my Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]")\
.appName("Influx").getOrCreate()
from pyspark.sql.functions import col, from_json, to_json
from pyspark.sql.types import *
#initializing the schema of my json file
schema = StructType([
StructField("Temperature", IntegerType()),
StructField("label", IntegerType())])
#reading structured streaming data from kafka topic 'example_1'
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "quickstart.cloudera:9092") \
.option("subscribe", "example_1") \
.option("failOnDataLoss", False)\
.load()
#extracting from the df our values through the json schema
df_2=df.selectExpr("CAST(value AS STRING)")\
.select(from_json(col("value"), schema)\
.alias("tmp"))\
.select("tmp.*")
#vectorize our feature
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Temperature'], outputCol = 'Feature')
vspark_df = vectorAssembler.transform(df_2.dropna())
#loading our NN
from pyspark.ml.classification import MultilayerPerceptronClassificationModel
NN_model=MultilayerPerceptronClassificationModel.load("file:///home/cloudera/Desktop/Neural_Network")
#predicting the work regime of our data
NN_predictions = NN_model.transform(vspark_df)
final_df=NN_predictions.select("prediction","Temperature","Feature")
#writing our final df to kafka topic 'example_2'
ds=final_df.select(to_json(struct("prediction","Temperature")).alias("value"))\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "quickstart.cloudera:9092")\
.option("topic","example_2")\
.option("checkpointLocation", "file:///home/cloudera/Desktop/checkpoint")\
.start()
ds.awaitTermination()
Finally, I will create a script which reads from Kafka topic 'example_2' and sends its content straight to InfluxDB.
#! /usr/bin/env python2.7
import json
from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8087, database='example')
client.create_database('example')
import kafka
from kafka import KafkaConsumer
consumer = KafkaConsumer('example_2', bootstrap_servers=['quickstart.cloudera:9092'])
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
for message in consumer:
json_body = [
{
"measurement": "new_table", #this table will be automatically created
"tags": {
},
"fields":json.loads(message.value)
}
]
client.write_points(json_body)
Now, in three different terminals, we run our scripts:
#this is a spark job, hence we run it through spark-submit
[root@quickstart] ./bin/spark-submit --packages\
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0, \
org.apache.kafka:kafka-clients:0.10.2.1, \
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0 reading_from_kafka.py
#then we write our two further scripts:
[root@quickstart] ./writing_to_kafka.py
[root@quickstart] ./writing_to_influx.py
Now let's jump to Grafana. After having connected our dashboard to InfluxDB (you can easily see how to do that on Grafana website), we can create our graph, which shows both the actual temperate and the work regime predicted by our NN.
The green line is our temperature, while the yellow one is the correspondent work regime.
You can add many variants to your graphs: it is possible to aggregate, group or compute operations on your data by adding as many queries as your want. Furthermore, even though here I've been working with time series, there is plenty of graphs Grafana offers you, depending on the task. With the integration of these three tools, you can reach the goal of not only building real-time models, but also efficiently presenting them.