Building Data Pipeline & Data Analytic Service Using Akka Stream, Apache Spark, Kafka & Cassandra. Retail Store Big Data Use Case. Part One.

Building Data Pipeline & Data Analytic Service Using Akka Stream, Apache Spark, Kafka & Cassandra. Retail Store Big Data Use Case. Part One.

In the retail industry, data has become an irreplaceable asset for achieving a high sale rate and increase profitability. Apart from the financial advantages, Big data has its unique benefit for generating curious and creative insight into sales growth, sales forecast, and analysis, targeted advertisement. The retails store can use data in terms of location, demographics, buying behavior, customer trends, and others, to predict the success and future performance of its new stores, which will open in different parts of the world. This way, the organization mitigates the risk of opening a store in an unprofitable location and eventually preventing any store bankruptcy. They can track sales data to make an informed decision.

This article will describe a fictitious retail store use case using big data to track daily sale data. We are going to demonstrate how to build a Big data pipeline for retail store data analytic. The post is divided into two, part one focuses on Data Ingest & Data Pipeline with Akka Stream, Akka HTTP, Kafka, and Cassandra while part two is on data analysis with Apache Spark and Cassandra all implemented in Scala.

What is Data Ingest

Data ingestion is a process that collects data from various data sources, in an unstructured format and stores it somewhere to analyze that data. This data can be real-time or integrated into batches. Real-time data is ingested as soon it arrives, while the data in batches is ingested in some chunks at a periodical time interval. To make this ingestion process work smoothly, we must have a data pipeline in place to move the data from source to destination.

What is Data Pipeline

data pipeline is a series of data processing steps. If the data is not currently loaded into the data platform, then it is ingested at the beginning of the pipeline. Then there are a series of steps in which each step delivers an output that is the input to the next step. This continues until the pipeline is complete. Data pipelines also may have the same source and sink, such that the pipeline is purely about modifying the data set. Any time data is processed between point A and point B (or points B, C, and D), there is a data pipeline between those points. We want to build in-store analytics and actionable insight into sales data.

Use Case

Lasgidi Payless Retails Stores is a fictitious North American retails store outlet that operates across cities in Canada, America, and Mexico. It off-price family apparel and fashion retailer. It deals in all fashion clothing for women, men, and kids. They want a system that can provide a detailed view of the essential business metrics (Revenue, ATV, UPT). The real-time sales data should be integrated with Point Of Sales terminals.

Below are the functional and non-functional requirements:

  • Aggregate and prepare sales data for data scientists for prediction and sales forecasts.
  • Aggregate daily sale data per store using Apache Spark data aggregation transformation functions.
  • Aggregate weekly sale data per store using Apache Spark data aggregation transformation functions.
  • Aggregate monthly daily sale data per store using Apache Spark data aggregation transformation functions.
  • Real-time sale data from each store from Point of Sale terminal should be routed to the data platform landing zone i.e Kafka via a data ingest API service adapter.
  • The acceptable latency of the system is 20-35ms for sending a request to the landing zone.
  • Sale data from each store should be processed in parallel for high throughput taking advantage of Kafka partitions for parallel processing. Each retail store data should be a map to a partition in Kafka topic. i.e the number of retail stores should be the size of the partition in the Kafka topic.
  • Build a flexible data pipeline that can flow from a source i.e Kafka to a sink data store i.e Cassandra using Akka Stream
  • All the data aggregation and analysis should be stored in Cassandra that can be consumed by Business Intelligent tools and data scientist
  • Develop Apache Spark batch job that runs monthly.

With the functional and non-functional requirements above we need to build flexible data ingest and data pipeline that can scale for fast data streaming.

No alt text provided for this image

Each retail store outlet sends its stream of daily sales via data Data Ingest services. The ingest service is an Akka HTTP endpoint that forwards the incoming stream of data to Kafka topic to be consumed downstream by a data pipeline. The Data pipeline service is Akka stream services that consume from Kafka and sink the data into Cassandra for data analytic by Apache Spark batch job run at a scheduled interval to run some data transform and analytic on our data, each of the services is a separate microservice that runs on and scales independently.

Data Ingest With Akka Stream, Akka Http & Kafka

The data ingest service is a simple HTTP endpoint service that is built with Akka HTTP and Akka Stream using Kafka Producer Sink API. The daily sale record data is sent via an HTTP endpoint sink to Kafka topic. We want to send each inbound message originating from the retail store client API to a specific Kafka partition. i.e Each partition on the Kafka topic is mapped to a retails store ID. To achieve this requirement, we need to implement a custom Kafka Partitioner. Kafka uses the partitioner to route the message to the dedicated partition in Kafka's topic. We will be using these partitions strategy to achieve parallelism with the downstream Kafka consumer later in the workflow. The visual representation of the partition is shown using the diagram below.

No alt text provided for this image

Below are code snippets from custom Kafka Partitioner

	class KafkaStoreIdPartitioner extends Partitioner{

	  var storePartitionData: Map[String,Int] = Map.empty
	  val defaultPartition = 0
	  override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
	

	    if ((keyBytes == null) || (!key.isInstanceOf[String]))
	      throw new InvalidRecordException("All messages must have store code name as key")
	    val storeKey = key.asInstanceOf[String]
	     storePartitionData.getOrElse(storeKey,defaultPartition)
	  }

	  override def close(): Unit = {}

	  override def configure(configs: util.Map[String, _]): Unit = {
	    val storePartitionIds = configs.get("storePartition")
	    val storeIdPartitionerPair = storePartitionIds.asInstanceOf[String]
	    if(storeIdPartitionerPair.isBlank || storeIdPartitionerPair.isEmpty)
	      throw new IllegalArgumentException("Store partition configuration not set or found")
	    storePartitionData = storeIdPartitionerPair.split("&").map(_.split("=")).map(arr => arr(0) -> arr(1).toInt).toMap
	  }
	}


	object KafkaStoreIdPartitioner{
	  def apply() = new KafkaStoreIdPartitioner()
	}


Let break the code down.

  • Kafka Partitioner has two main functions close and configures functions.
  • The partition rule or algorithm is implemented in configure function. It is a key-value pair mapping of Store Id and partition number.
  • When a message is being processed by Kafka. It used our custom partitioner implementation to look up the value using the key in the map. It returns a partition number if doesn't find one, it used the default partition which 0.

Below are code snippets from the Kafka Producer Service class.

class KafkaProducerService(topicName: String)(implicit  mat: Materializer, implicit val producerSettings: ProducerSettings[String,String]) {
	
	  def sendDailySalesRecord(saleRecord: DailySaleRecord) : Future[Done]={
	    import dailySaleRecord._
	    val done = Source.single(saleRecord)
	      .map(s =>  new ProducerRecord[String, String](topicName,saleRecord.store.code,s.asJson.noSpaces))
	      .runWith(Producer.plainSink(producerSettings))
	     done
	  }
	}
	
	object KafkaProducerService{
	   def apply(topicName:String)(implicit  mat: Materializer, producerSettings: ProducerSettings[String,String])  = new KafkaProducerService(topicName)
	}

Let us break it down.

  • ProducerService class exposes a sendDailyRecord function for publishing data to Kafka.
  • We mapped dailySalesRecord object to the Kafka Producer record object
  • We used Akka Stream Kafka Producer sink API to ingest the record into the Kafka topic partition with a combination of retail store code as key Id and partition number assigned to the retail store by customer partitioner
  • Ingestor Services is a stateless service. We can run up multiple copies of these service instances for high availability and scalability.

Data Pipeline With Akka Stream Kafka And Cassandra

The data pipeline is implemented using Akka Stream API. Akka Stream framework simplifies the building of a stable and high-performance data pipeline. It comes with out-of-the-box data source and sink connectors ranging from ElasticSearch, Kafka, AWS S3, AWS Kinesis, HBase, Cassandra, AWS Lamda, MongoDB, Hadoop, AWS DynamoDB, Jdbc Connectors, FTP, and many more using Alpakka Connectors API framework.

Akka streaming is an implementation of the Reactive Streams interface. At its core, the Reactive Streams interface aims to be â€œan initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.” 

The four main building block in Akka’s streaming API are:

No alt text provided for this image
  • Source: A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them. The source can have finite or infinite data sources.
  • Flow: A processing stage with exactly one input and output connects it's up and downstream by transforming the data elements. Data transformation stage. It can be as simple as data validation or parsing or complex transformation depending on use-cases.
  • Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements
  • RunnableGraph: A Flow that has both ends “attached” to a Source and Sinks respectively and is ready to be run.

Akka Streams is an extremely high-performance library built for the JVM, written in Scala, Akka stream is built on top of Akka Actor under the hood. the source, flow, and sink components are Akka Actors but it is not distributed Akka cluster. It only runs on a single JVM but it can easily be scaled with a container orchestration engine such as Kubernetes.

To build complex data pipeline workflows, Akka Streams expose a dedicated DSL (Domain Specific Language), that can be used to plumb various stages and components of Akka Stream together. let’s try to visualize our graph using a diagram - it is a nice way to design our system before focusing on coding.

No alt text provided for this image

Let dive more into the details of data pipeline graph components.

Kafka As Infinte Source of Data

Kafka Akka Stream API implements Source and Sink API respectively. From Kafka, terminology Source is Consumer while Sink is Producer. In our data pipeline workflow, we want to be to consume streams of daily sale records from Kafka and store them in Cassandra. Kafka Akka Stream consumer API comes with various implementations to choose from depending on our use case and requirement. In Kafka, the main level of parallelism is the number of partitions in a topic. We can spawn that number of consumers instance to keep reading data from the same topic in parallel. From our data ingestion implementation, the data from each retails store is saved into a Kafka partition. We want to be able to consume from each partition in parallel. We can technically size our Kafka consumer with the number of partitions. i.e if we have 20 partitions we can have 20 Kafka consumers with the same consumer group run in parallel to yield high throughput and high rate of data ingest depending on our Kafka cluster setup.

Each consumer reads data from a specific partition that is mapped by our Kafka custom partitioner implementation. If we scale down the number of consumers Kafka allocates or assign the partition to another instance of running consumer This process is called rebalancing. Kafka starts a rebalancing if a consumer joins or leaves the group.

Below are code snippets Kafka Consumer Partition Source

  val convertFromKafkaMessageToDailySalesFlow: Flow[ConsumerMessage.CommittableMessage[String, Array[Byte]], (DailySaleRecord,ConsumerMessage.Committable), NotUsed] =
	    Flow[ConsumerMessage.CommittableMessage[String, Array[Byte]]].map { message =>
	      import dailySale._
	      val payload = new String(message.record.value())
	      val key = message.record.key()
	      val partition = message.committableOffset.partitionOffset._1.partition
	      system.log.info(s"store data payload ${payload}  Key: ${key}  partition ${partition}")
	      val saleRecord = parser.decode[DailySale](payload).toOption.get
	      (saleRecord.convertToDailySaleRecord,message.committableOffset)
	    }
  
Consumer
	      .committablePartitionedSource(consumerSettings, Subscriptions.topics(kafkaTopic))
	      .mapAsyncUnordered(maxPartitions) { case (topicPartitions, source) => {
	        system.log.info(s"partition-[${topicPartitions.partition}] topic-[${topicPartitions.topic}\n\n\n")
	        source
	          .via(convertFromKafkaMessageToDailySalesFlow)

Let break it down.

  1. mapAsynchUnorderd maximum number of the partition to return a tuple of Kafka partition and source stream of Commitable Message
  2. We convert Kafka CommitableMessage to dailySaleRecord by using Flow
  3. We parse the JSON records to and return a tuple of dailySaleRecord and committableOffset
  4. With a few lines of code, I wired everything up our Kafka Source Stream is ready to emit messages to downstream Cassandra Flow.

Storing Data Into Cassandra

After consuming our daily sale data from Kafka we want to store it in Cassandra for further analysis by Apache Spark. Cassandra Akka Stream implements Source, Flow, and Sink API. Source API reads data from Cassandra while flow API stores data in Cassandra. We will use Flow API to save our data into Cassandra. Cassandra creates a prepared statement and for every stream of the element, the statement binder function binds the CQL (Cassandra Query Language) placeholders to data. The stream of data is inserted into the Cassandra table.

Below are code snippets from the Cassandra Flow.

 val statementBinder: ((DailySaleRecord,ConsumerMessage.Committable), PreparedStatement) => BoundStatement =
	    (dailySales, preparedStatement) => preparedStatement.bind(dailySales._1.store.code,dailySales._1.store.location.city
	      ,dailySales._1.store.location.state,dailySales._1.itemSale.productCode,
	      Int.box(dailySales._1.itemSale.quantity),Double.box(dailySales._1.itemSale.unitAmount.toDouble),Double.box(dailySales._1.itemSale.totalAmount.toDouble),
	      dailySales._1.transactionDate,Int.box(dailySales._1.month),Int.box(dailySales._1.year),dailySales._1.itemSale.category,dailySales._1.store.location.countryCode)
	
  val cassandraInsertStatement = s"INSERT INTO $cassandraKeySpace.$cassandraTable(storeCode,city,state,productCode,quantity,unitAmount,totalAmount," +
	    s"transactionDate,salemonth,saleyear,category,country) " +
	    s"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
	
	

	  val cassandraWriteFlow: Flow[(DailySaleRecord,ConsumerMessage.Committable), (DailySaleRecord,ConsumerMessage.Committable), NotUsed] = CassandraFlow.create(CassandraWriteSettings.defaults, cassandraInsertStatement, statementBinder)

Let break it down.

  1. initialize statement binder function that binds the dailysalerecord to Cassandra columns
  2. initialize Cassandra insert statement
  3. create CassandraFlow with (dailySaleRecord, conumserMessage.committable) tuple as an inflow and the same tuple as an outflow

Runnable Graph DSL

We connect these components to build a highly scalable data pipeline Runnable Graph. Using Akka Stream Runnable Graph DSL, the great advantage here is that you can isolate your domain logic in form of small, composable building blocks, in total separation from the code describing how they are connected.

Below are code snippets from the Data pipeline Runnable Graph.

  .committablePartitionedSource(consumerSettings, Subscriptions.topics(kafkaTopic))
	      .mapAsyncUnordered(maxPartitions) { case (topicPartitions, source) => {
	        system.log.info(s"partition-[${topicPartitions.partition}] topic-[${topicPartitions.topic}\n\n\n")
	        source
	          .via(convertFromKafkaMessageToDailySalesFlow)
	          .via(cassandraWriteFlow)
	          .map(msgCommitter => msgCommitter._2)
	          .runWith(Committer.sink(committerSettings))
	      }
	      }.toMat(Sink.ignore)(DrainingControl.apply)
	}

Akka Stream provides a high-level API to compose and build a complex, high-performance, and scalable data pipeline, It also makes it easy to batch data on size or time and propagates backpressure for both upstream and downstream components.

The full source code can be downloaded on my Github repository below. Feel free to clone the repo modify, play around, and enhance the source code adapt it to your use case.

Takeaways

In this article, We explore how Akka Streams can handle asynchronous data pipelines with non-blocking backpressure. Akka Streams is extremely high-performance for building big data pipelines, data transformations, ETL pipelines. We also explore how Kafka Akka stream API can build fast data ingest, using Kafka Partition for parallel processing and Cassandra for the data storage.

Read more about Akka Stream API

In part two of this article, we are going to discuss how to analyze the data stored in Cassandra using Apache Spark. Stay tuned.

Thank you for reading.

Oluwaseyi Otun is a Backend Software and Data Engineer (Scala, Java, Akka, Apache Spark). Big Data and distributed system enthusiast with a special interest in functional programming, software architecture, design pattern, microservice, clean and quality code. I love learning internals working on a system and exploring what happens under the cover.

Colin Hattie

Senior Technical Program Manager at Quantiphi. Helping clients achieve their goals.

3 å¹´

Timely and informative. Thanks for sharing.

赞
回复

要查看或添加评论,请登录

Oluwaseyi Otun的更多文章

社区洞察

其他会员也浏览了