Generic Data Ingestion Process in Apache Spark

Generic Data Ingestion Process in Apache Spark

In this article, We will understand how we can write a Generic Ingestion Process using Spark. We will be using Databricks for it. Our goal is to create an ingestion framework which can ingest files of the following file formats from any cloud location and load into any Database table or Cloud Directory.

We will create a single notebook to accomplish this. We can extend this framework to ingest the data from ODBC/JDBC but that we will leave for next article.

Our present focus is to ingest from files and we will restrict to the below formats though we can very well extend any supported file formats.

  1. Parquet Files
  2. JSON Files
  3. CSV Files

What is Databricks

Databricks is a fast, easy, and collaborative Apache Spark-based analytics service. Databricks builds on the capabilities of Spark by providing a zero-management cloud platform that includes:

  • Fully managed Spark clusters
  • An interactive workspace for exploration and visualization
  • A platform for powering your favourite Spark-based applications

let's get started

Step1: We will create a cluster and a Notebook. ( This is petty easy on Databricks) . We will name the notebook as - Generic_Ingestion_Notebook. We will be working on pyspark so this is a python notebook.

Step2: We will create 5 parameters for our Notebook.

  1. InputPath - This can be a path of your cloud location
  2. InputFile - The name of your source file
  3. TargetTable - The target table where we want to load our data
  4. TargetPath - Target Cloud path where we want to load our data
  5. LoadType - Table or File
No alt text provided for this image
No alt text provided for this image

Step3: Getting the Type of File and printing the File name

No alt text provided for this image

Step4: Extracting the extension of the file. i.e (.csv) , (.json), (.parquet)

No alt text provided for this image

Step5: Function to get the DataFrame based on the File Format

Remember this is very basic. We can add the complexity as required. Also ideally this should not be a part of the main generic notebook, We should be keeping all the reusable methods in the separate folder and should call that folder at the beginning of our generic ingestion process.

No alt text provided for this image

Step6: Calling the function and getting the Dataframe

No alt text provided for this image

Step7: Optionally, if we wish to print / store the schema of the processed file

No alt text provided for this image

Step8: Also, If we want to record the count of the processed file.

No alt text provided for this image

Step9: Now, We have to save this dataframe either in any table or any file systems provided we have the valid connection credentials with us.

Here, We are loading into the Snowflake Cloud Data-warehouse. ( Note : I have removed the Snowflake credentials cell after running it )

No alt text provided for this image

Running the Notebook

We basically can run our notebook in two ways either manual or on the schedule basis at the fixed time ( like the way we do in CRON job ).

We can create another notebook - Run_Notebook and use the magic function %run to run our Generic_Ingestion_Notebook for different parameters like the below.

No alt text provided for this image

Note: We can create more complex workflow via - dbutils.notebook.run but this is not in scope for today.

So let's run one of the cell and see how it looks like.

No alt text provided for this image

Yeah, that's it. We have loaded the csv file data into snowflake table via our generic ingestion process.

So if you followed it till end, you can see how easy it is to create a very basic generic framework for ingesting the data from heterogeneous sources and loading it to anywhere. We can extend this framework to cater more complex use cases. Please try it by yourself and let me know if you are able to recreate it. Also do let me know in case of any clarifications.

Thanks for reading it. I hope you found yourself informed. Please press the "like" button if you like it.

Gonzalo Nistal

Ingeniero de Datos Sr en QuintoAndar Group

3 年

Great article Deepak, many thanks for shsrr your knowlegde.

回复
? Jeffrey Hasenbos

?? Founder @ Datashark Consultancy | Crafting Data Platforms, Warehouses, and Solutions for Analytics and Business Intelligence ??

3 年

Nice article! Do you have the code on Github or something?

回复
Antriksh Chourasia

Data Engineer - Databricks, AWS Glue, AWS S3, Redshift, Hive, Pyspark, Shell Scripting, Jenkins CI/CD, GitHub.

3 年

Great article to learn from. Liked every part of it. Deepak Rajak can you please publish one for data transformation like removing duplicates from multiple columns, adding new column based on certain conditions or anything you can think of about manipulating the dataset.

Sourabh Joshi

Lead Data Engineer | Wantrepreneur | Blogger

3 年

Looks fine! One suggestion - maybe you could add some lines to validate the data before loading it onto the target?

Rakesh Sahoo

Data Engineer| AWS | Python | Core Java | SQL | HIVE | JP Morgan Chase and Ex-TCSer

4 年

adding the getArgument() while creating notebook is not there in databricks free community version???

要查看或添加评论,请登录

社区洞察

其他会员也浏览了