Read from Kafka & Write to Snowflake via Spark Databricks

Read from Kafka & Write to Snowflake via Spark Databricks

In this article, we will learn how can we Read from Kafka & write that data in Snowflake Table via Databricks using Spark Structured Streaming process.

But before, we start let's understand some of the key concepts.

What is Kafka ?

Kafka is software designed upon the publish/subscribe messaging pattern. Publish/subscribe messaging is where a sender (publisher) sends a message that is not specifically directed to a receiver (subscriber). The publisher classifies the message somehow and the receiver subscribes to receive certain categories of messages. There are other usage patterns for Kafka, but this is the pattern we focus on in this course.

Publisher/subscriber systems typically have a central point where messages are published, called a broker. The broker receives messages from publishers, assigns offsets to them and commits messages to storage.

The Kafka version of a unit of data an array of bytes called a message. A message can also contain a bit of information related to partitioning called a key. In Kafka, messages are categorized into topics.

No alt text provided for this image

What is Snowflake ?

Snowflake is a single platform comprised of storage, compute, and services layers that are logically integrated but scale infinitely and independent from one another.

Snowflake is a cloud data warehouse built on top of the public cloud (AWS / Azure / GCP ) infrastructure and is a true SaaS offering. There is no hardware (virtual or physical) for you to select, install, configure, or manage. There is no software for you to install, configure, or manage. All ongoing maintenance, management, and tuning is handled by Snowflake.

No alt text provided for this image

The Kafka Server & Schema

The Kafka server is fed by a separate TCP server that reads the Wikipedia edits, in real time, from the various language-specific IRC channels to which Wikipedia posts them. 

That server parses the IRC data, converts the results to JSON, and sends the JSON to a Kafka server, with the edits segregated by language. The various languages are topics.

For example, the Kafka topic "en" corresponds to edits for en.wikipedia.org.

No alt text provided for this image


Note: This Kafka Server is already setup by Databricks & provided the access to it so that we can explore. But if you want to explore your own Kakfa, I would suggest you to create a free trail account on Confluent Cloud.

Ofcourse you need to bring some data in it i.e. Twitter data via the Twitter Development Account ( Some work need to be done there :) ).

Below is the link for Confluent.

Luckily, Databricks made our life easy by already providing the full fledged Kafka Server & we will use it to read the data from it , process it in Databricks Notebooks & write it in to Snowflake Tables.

Lets start the Step by Step Process:

Step1: Reading from Kafka Server into Spark Databricks

No alt text provided for this image

In this example , the only column we want to keep is value column because thats the column we have the JSON data.

Step2: Defining the Schema for Streaming data.

Note: We can't inferSchema for any streaming data.

No alt text provided for this image

Step3: Parsing the Input json which we got from Kafka i.e. value field via the from_json function.

No alt text provided for this image

Step4: Filtering records ( Reducing the Volume). Also just making the columns one level up by creating the aliases.

No alt text provided for this image

Step5: Now optionally, We can apply any transformation the way we apply when we process the batch data. ( Please try it out )

Step6: Setup credentials for Snowflake.

No alt text provided for this image

Step7: Loading Data into Snowflake

No alt text provided for this image

Note: We don't have any direct options which can write the data into Snowflake so I have written a small custom function - writeToSnowFlake() which takes the dataframe & epochId. Basically this function will be called for each micro batch.

Step8: Viewing Data in Snowflake

No alt text provided for this image

Note: The Streaming job will always be running until we terminate it either via manually or programmatically. We need to decide when or based on what , we need to terminate our streaming job

Below is the way we can stop it via the command.

streamingQuery.stop()


So with step by step process we are able to read the data from Kafka & Write to Snowflake table via Databricks.

The same thing can be repeated to other streaming sources like Amazon Kinesis, Azure Event Hubs etc & the loading also can be repeated to other Cloud Data Warehouses like AWS Redshift, Azure Synapse Analytics.

Please try it out, & let me know in case you need my help in connecting to those sources and targets.

That marked the end of this article. Please like / Share if you have found it useful. Please provide you feedback in the comment sections.

Thanks for reading it. Will talk to you in my next article.

Anil Kulkarni

Senior Engineer @ lululemon | AWS, Apache Airflow, Avro

3 年

How do you manage the offsets if you want to micro batch it?

回复
Dr. Guillermo G Schiava D'Albano

Sr Partner Technical Manager at Databricks | 14k followers | All views are my own

3 年

Good article. One option for this architect would be instead of writing to Snow Flake. To write directly to Delta Lake. Specially in the case of high volumes of data such as IoT etc.? Unless you really need a DW and even then I would steam all to Delta Lake and only push what you need to the DW Keeping your data in Delta you will simplify the life of your DS and BI as they will have all the data should they need it

Anders Boje Hertz

Founding Partner | AI & Data Architect @ LEAP

3 年

Or the easy and quick alternative. Connect Kafka and Snowflake directly https://docs.snowflake.com/en/user-guide/kafka-connector-overview.html No need for Databrick in this case

Saikrishna Cheruvu

Lead Developer | Data Engineer | MLOPS | ex@ BOFA

3 年

Good writeup Deepak Rajak

回复

要查看或添加评论,请登录

Deepak Rajak的更多文章

  • Multi Tasks Job in Databricks

    Multi Tasks Job in Databricks

    A job in Databricks is a non-interactive way to run an application in a Databricks cluster, for example, an ETL job or…

    3 条评论
  • Deploying Databricks on Azure

    Deploying Databricks on Azure

    Databricks is Cloud agnostic Platform as a Service ( PaaS) offering available in all three public clouds . In this…

    9 条评论
  • Databricks SQL - The new Cloud Data Ware(Lake)house

    Databricks SQL - The new Cloud Data Ware(Lake)house

    Databricks SQL is a product offering from Databricks which they are pitching against the likes of Snowflake, AWS…

    10 条评论
  • Create Tables in Databricks & Query it from AWS Athena

    Create Tables in Databricks & Query it from AWS Athena

    In my last article, we have integrated AWS Glue with Databricks as external data catalog ( Metastore ). Here is a link…

    2 条评论
  • AWS Glue Data Catalog as the Metastore for Databricks

    AWS Glue Data Catalog as the Metastore for Databricks

    We can configure Databricks Runtime to use the AWS Glue Data Catalog as its metastore. This can serve as a drop-in…

    10 条评论
  • Deploying Databricks on AWS

    Deploying Databricks on AWS

    Databricks is Cloud agnostic Platform as a Service ( PaaS) offering available in all three public clouds . In this…

    1 条评论
  • Danny's Diner Case Study using Pyspark on Databricks

    Danny's Diner Case Study using Pyspark on Databricks

    If you are a Data guy - Analyst, Engineer or Scientist, you needed to explore some good end to end case study / project…

    9 条评论
  • Azure Cloud Data Engineering

    Azure Cloud Data Engineering

    You might have fed up enough by listening to people that the Cloud is the way forward, learn it, everything is going…

    22 条评论
  • Deploying Databricks on Google Cloud Platform

    Deploying Databricks on Google Cloud Platform

    Databricks now available on GCP as well ( Ofcourse already available in AWS & Azure ). In this ultra short article we…

    4 条评论
  • CI / CD in Azure Databricks using Azure DevOps

    CI / CD in Azure Databricks using Azure DevOps

    In my last article, I have integrated Azure Databricks with Azure DevOps, so before you read this one further, please…

    19 条评论

社区洞察

其他会员也浏览了