Cheers to Real-time Analytics with Apache Flink : Part 3 of 3
Wer Ordnung h?lt, ist nur zu faul zum Suchen (If you keep things tidily ordered, you’re just too lazy to go searching.) —German proverb
Drawing inspiration from the principles explored in Chapter 3 of Designing Data-Intensive Applications, we embark on our quest to harness the power of Elasticsearch and Kibana for data storage and visualisation in real-time.
Building on the first two parts (Part-1 , Part-2), where we explored real-time data ingestion with Apache Flink and storage in PostgreSQL, part 3 delves into the exciting world of real-time data visualisation.
Project Architecture Recap
What is Elasticsearch ?
Here's a quick rundown on Elasticsearch:
In a nutshell, Elasticsearch lets you store, search, and analyze large amounts of data quickly and efficiently, making it a valuable tool for tasks like log analysis, website search, and real-time analytics.
What is Kibana ?
Here's a quick explanation of Kibana:
In essence, Kibana takes the power of Elasticsearch and transforms it into visually compelling insights. It allows you to interact with, analyze, and communicate your data effectively.
Why Elasticsearch and Kibana for Data Visualisation in real-time?
Here are some of the key reasons why Elasticsearch and Kibana are a popular choice for data visualization:
Perfect Match:
Elasticsearch Strengths:
Kibana's Visualization Prowess:
Additional Considerations:
Overall, Elasticsearch and Kibana offer a powerful and versatile combination for data visualization. Their speed, scalability, data handling capabilities, and rich visualization features make them a compelling choice for a variety of use cases.
Index in Elasticsearch
In Elasticsearch, an index is a fundamental unit of organization for your data. It's like a giant filing cabinet within the larger storage system. Here's a breakdown of what an index entails:
Here's an analogy to solidify the concept:
Imagine a library with different sections (indices) like fiction, non-fiction, and biographies. Each section contains books (documents), with each book having chapters and pages (fields within a document). The library might organize the books by genre or author (similar to defining a schema).
In essence, Elasticsearch indexes help you structure, organize, and efficiently search your data. They provide a scalable and fault-tolerant way to store and retrieve information.
Getting Started With Apache Flink and Elasticsearch
The Apache Flink Elasticsearch connector acts as a bridge between your Flink streaming or batch data processing pipelines and the Elasticsearch search engine. It allows you to:
Here are some key features of the connector:
Define An Elasticsearch Sink
def createIndexRequest(element:(Transaction)): IndexRequest = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val json: String = mapper.writeValueAsString(element)
Requests.indexRequest.index("transactions").id(element.transactionId).source(json,XContentType.JSON)
}
def writeToElastic(transactionStream: DataStream[Transaction]) = {
val sink: ElasticsearchSink[Transaction] = new Elasticsearch7SinkBuilder[Transaction]
.setHosts(new HttpHost("localhost", 9200, "http"))
.setBulkFlushMaxActions(2)
.setBulkFlushInterval(10L)
.setEmitter[Transaction]{
(transaction, context, indexer) => {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val json: String = mapper.writeValueAsString(transaction)
val indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.transactionId)
.source(json, XContentType.JSON);
indexer.add(indexRequest)
}
}.build()
transactionStream.sinkTo(sink)
}
Details of the Class Transaction is available Here :: https://github.com/snepar/flink-ecom/blob/master/src/main/scala/ecom/generators/Dto/Transaction.scala
Write to Elasticsearch Sink
val transactionStream = readCustomData()
transactionStream.print()
writeToElastic(transactionStream)
Refer the Complete Code here :: https://github.com/snepar/flink-ecom/blob/master/src/main/scala/ecom/KafkaPGESIntegrationEcom.scala
领英推荐
Spin Up Your Playground
Follow the instructions here :: https://github.com/snepar/flink-ecom-infra
Elasticsearch will be available in localhost:5601
And Execute This application to get started with the Data Ingestion :: https://github.com/snepar/flink-ecom
Search Using Elastic
Here’s the structure of the transaction index on elasticsearch. You can get this by running
GET transactions
in the DevTools.
Query In Elastic
GET transactions/_search
Re-Index Data in Elastic
To get a readable transaction date, we need to reindex into a different index. To reindex data on elasticsearch, we use the function ::
_reindex
POST _reindex
{
"source": {"index": "transactions"},
"dest": {"index": "transaction_part1"},
"script": {"source":"""
ctx._source.transactionDate = new
Date(ctx._source.transactionDate).toString();
"""}
}
GET reindex/_search
However, using toString() does not give us much room to wiggle around the format. So we need to use a more robust way to format the data.
POST _reindex
{
"source": {"index": "transactions"},
"dest": {"index": "transaction_part2"},
"script": {"source":
"""SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
formatter.setTimeZone(TimeZone.getTimeZone('UTC'));
ctx._source.transactionDate = formatter.format (new
Date(ctx._source.transactionDate));"""
}
}
GET transaction_part2/_search
Dashboard-ing in Realtime With Kibana
Index on transaction_part2
Creating Donut Chart
Number Of Transactions
Top Brands
Final Dashboard (Keeping It All Together)
Thank You For Spending Sometime Here
This is the Complete Post :: https://dev.to/snehasish_dutta_007/apache-flink-e-commerce-data-pipeline-usecase-3ha9
Repositories ::
In Case You Have Missed
Acknowledgements and Inspirations
#data #bigdata #apacheflink #scala #java #kafka #streaming #python #elasticsearch #kibana