Data Platform: Data Ingestion Engine for Data Lake
Miguel Garcia Lorenzo
Hello! I am a seasoned technology expert with a wealth of experience and a passion for building successful teams and creating business value by orchestrating innovative technologies.
This article is a follow-up to?Data Platform as a Service?and?Data Platform: The New Generation Data Lakes. In this case, I will describe how to design and build an automated Data Ingestion Engine based on Spark and?Databricks?features.
The most important principle to design a Data Ingestion Engine is to follow an automation paradigm. Automation provides a set of key advantages to be successful, some of them are in the following diagram.
There are several questions that we have to ask ourselves before starting:
What is automation in our case?
By definition, automation means "the use of machines and computers that can operate without needing human control". We are in a software environment; a data pipeline will be an automated computer process therefore in our case "automation" means much more than that.
In this scenario, an automated process means that we don't have to develop a data pipeline for each new dataset. For example, If we need to push data from Kafka to a Data Lake or Data Warehouse as shown in the diagram:
It can be a simple scenario or a very complex one because each topic can have a different kind of data, features, and behaviors:
Traditionally in many companies, there are ETL teams that develop custom ETL for each data set, regardless of the case complexity.?We need to avoid this kind of solution and work to design automated and standard processes.?ETLs have some important challenges such as:
How much time and/or effort can we save with automation?
When we design and build a Data Platform, we always need to evaluate if automation provides enough value to compensate the team effort and time. Time is the only resource that we can not scale. We can increase the team but the relationship between people and productivity is not direct.
Sometimes?when?a?team?is?very?focused?on?the?automation?paradigm,?people?want?to?automate?everything.?Even?actions?that?only?require?one?time?or do not?provide?real?value.
User needs evolve continuously and, new use cases are coming up. We must evaluate very carefully each one:
Usually, this is not an easy decision and it has to be evaluated by all the team. In the end, it is an?ROI?decision. I don't like this concept very much because it often focuses on economic costs and forgets about people and teams.
Are there open-source/commercial tools available?
Before starting any design and development, we have to analyze if there are tools available to cover our needs. As software engineers, we often want to develop our software. But from a team or product view, we should focus our efforts on the most valuable components and features.
For example, if there is an open-source connector to ingest data from Kafka to Snowflake and this connector cover all the requirements. Probably to develop our connector doesn't provide any value to the product and we will make an effort that we could focus on another component.
Architecture
The goal of the Data Ingestion Engine is to make it easier the data ingestion from the data source into our Data Platform providing a standard, resilient and automated ingestion layer.?
A Data Engine is composed of different components. It is important to isolate the users from this complexity and provide them a common and friendly self-service platform.
In my opinion, there are four main components in a Data Ingestion Engine:
Not all of them are required in all environments. It depends on the maturity of the architecture. For example, if in our architecture all the replication cases from databases can be solved by?Change Data Capture?(CDC), we can avoid developing an ETL Ingestion Engine.??
Core Ingestion Engine
It is the core of the Data Platform. In this case, we describe a Core Ingestion Engine oriented to a Data Lake platform but we can follow the same approach in other cases. We begin by analyzing what this automation provides:
The landing zone is a storage layer, which acts as a staging area for data. This zone is composed of several cloud object storage organized by data domains. The goal of this layer is to provide a delivery entry point for the different data sources. It is an open layer that allows to the teams upload their data autonomously and, at the same time it is the source of our Core Ingestion Engine.
To automate the consolidation of this data in the Raw layer, we are going to build an engine based on?Databricks?and?AutoLoader. The AutoLoader is an interesting Databricks Spark feature that provides out-of-the-box capabilities to automate the data ingestion.?
In this article, we are going to use as a landing zone an?Azure Data Lake Storage Gen2?(ADLS) for all the cases. But in the case of near real-time events, we could use Kafka. ADLS allows the collection of objects/files within an account to be organized into a hierarchy of directories and nested subdirectories in the same way that the file system. Another important feature, it allows assigning?ACL?at the file and folder levels therefore we can share the same ADLS Storage with many users.
How does it work?
The solution and the data flow are simple. When a file is stored in blob storage, an event is triggered to a queue or topic. The autoloader job is subscribed to this queue, reads the event, gets the file, and applies the incremental process.
There are several options to?detect new files:?
We've described the file notification behavior. File notification can be provided by using?Azure Event Hub?or?Azure Queue Storage. If Azure Queue Storage is chosen, Auto Loader allows creating automatically the Event Grid and the queue. This feature simplifies the automation a lot but also the operation and monitoring are more complex because the naming of the queue or topics are not human readable. The monitoring features of Azure Queue Storage are not very good at the moment so it is something to take into account.
All the features make it easier for us to provide a managed service. We've created a generic spark process that contains several ingestions scenarios. Therefore we can reuse the same process and code to load all the data.
For example, suppose there are two tables that we have to load?incrementally in append mode:?
领英推荐
If we have a spark generic process that provides incremental functionality, we only need to launch the same spark process with two different configurations to load both tables. In the end, they will be two sparks jobs based on the same code.
How to create an automated layer
At this point, we have described the flow and capability that autoloader provides to simplify our data ingestion. To provide an automated solution "low-code", we have to add three components:
Metadata Repository
As a repository for the Core Ingestion Engine Metadata Database, we can use any RDBMS database but in this case, we've used an Azure SQL Database. Of course, we could use NoSQL and design the data model based on Documents but I think an RDBMS is a good option for this case.
There are three main entities that we need to configure a new data pipeline:
In this case, the following information is required for the "data_pipeline_configuration" table:
Landing zone information
Storage zone for the autoloader checkpoint?(watermark)
Pipeline information
Raw zone information
Audit information
In this case, the following information is required for the?"data_type_trasformation"?table:
Source Dataset
Target Dataset
Spark?Autoloader Generic Process
The goal is to design a generic code that we can reuse for all the dataset uses cases. Our generic process will have as an input parameter the pipeline id (unique identifier) that we'll use to retrieve all the relevant information to process each dataset. We have to apply the same best practices as always in our code: Clean Code, modular, minimum replication code, tests, etc.
The following code is a simple skeleton based on Databaricks Notebook scala and autoloader:
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Statement}
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{DataType, StructField, StructType}
...
...
import io.delta.tables._
/**
?* Step 6:
?*? ?Performs scenario-based data ingestion base on the pipeline attributes.
?*/
def executePipeline(pipelineDF: DataFrame, pipelineId: Long, pipelineProcessType String): Unit = {
? ...??
? if (pipelineProcessType == APPEND) {
? ...
? ? ? ? ? ??
? } else if (pipelineProcessType == "FULL_COPY") {
...
? }?
}
/**
?* Step 1:
?*? ?Job input parameter. PipelineId is the unique identifier of the pipeline.
?*/
var DataPipelineId = dbutils.widgets.get("DataPipelineId")
/**
?* Step 2:
?*? ?Get secrets from secret-scope.
?*/
val scope: String = "CoreIngestionEgine"
val sqlServerConnectionString: String = dbutils.secrets.get(scope, "SqlMetadataConnection")
val subscriptionId: String = dbutils.secrets.get(scope, "SubscriptionId")
val subscriptionId: String = dbutils.secrets.get(scope, "TenantId")
val connectionString: String = dbutils.secrets.get(scope, storageAccountConnectionString)
...
...
/**
?* Step 3:
?*? ?Get metadata associated with this pipelineid from Metadata Repository.
?*/
try {
? connection = getConnectionMetadata()
? val getDataPipelineConf = connection.prepareStatement("SELECT * FROM [DataEngine].[data_pipeline_configuration] WHERE [datapipelineid] = ?")
? getDataTableStreaming.setString(1, DataPipelineId)
? val resultPipelineConfiguration = getDataPipelineConf.executeQuery()
? if (resultPipelineConfiguration.next()) {
val landingStorageAccount = resultsDataTableStreaming.getString("DataStorageAccount")
val landingContainer = resultsDataTableStreaming.getString("DataContainer")
val landingDirectory = resultsDataTableStreaming.getString("DataDirectory")
val pipelineProcessType = resultsDataTableStreaming.getString("ProcessType")
....
? ? ....? ??
? ? } else {
throw new CriticalException("Pipeline properties for " + database + "." + table + " not found...")
? }
} finally {
? if (connection != null) {
connection.close()
? }
}
/**
?* Step 4,5:
?*? ?Configure AutoLoader process, connect to the Storage and Queue.
?*? ?Calls the function with the processing logic "executePipeline".
?*/
if (schemaSource != null && !schemaSource.isEmpty) {
? val schema: StructType = DataType.fromJson(schemaSource).asInstanceOf[StructType]
? spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", format)
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.connectionString", connectionString)
.option("cloudFiles.resourceGroup", resourceGroup)
.option("cloudFiles.subscriptionId", subscriptionId)
.option("cloudFiles.tenantId", tenantId)
.option("cloudFiles.clientId", spnId)
.option("cloudFiles.clientSecret", spnSecret))
.schema(schema)
.load(folderPath)
.writeStream
? ? .queryName("pipeline" + "#" + database + "#" + table)
.foreachBatch({ (pipelineDF: DataFrame, pipelineId: Long, pipelineProcessType: String) =>
? executePipeline(pipelineDF, pipelineId, pipelineProcessType)
})
.trigger(Trigger.ProcessingTime(processingTime))
.option("checkpointLocation", checkpointLocation)
.start
.awaitTermination
} else {
? throw new CriticalException("Schema for " + database + "." + table + " not found...")
}
Self-Service Portal
We have to provide an easy configuration tool to the users. There are no rules to define what is the best solution. As usual, we have to make the decision based on user skills. For example, in very technical environments a GitOps approach can be perfect but if most of the users are business people,?Web Portal may be a good choice. The following mockup is a very simple example:
This portal has to be integrated with the SQL Database and also with Databrick. These integrations in my opinion have to be:
Tips
Some high-level tips that will help in the development:
Conclusions
This article describes a simple case to design and build a Data Engine based on Spark. Nowadays, there are a lot of open-source tools and features that help us to automate our solutions.?Very often,?the key to success is to?select correctly?on what to focus our time on and?which automation generates value. We must never forget the global view and?analyze the value of each feature with the team and the users.
We have not discussed Kafka and data connectors, these topics will be covered in future articles in the series.
I hope you had a good time reading!