Understanding Batch and Real-Time Processing in DataBricks

Understanding Batch and Real-Time Processing in DataBricks


Synopsis

  • Mounting Azure DataLake in DataBricks
  • Differences between Batch Processing and Stream Processing
  • Working with Batch Processing
  • Working with Stream Processing

Introduction

In today’s data-driven landscape, businesses face the challenge of efficiently processing and analyzing large volumes of data in both batch and real-time scenarios. With the evolution of technologies like Azure Databricks and Apache Spark?, data engineers and analysts have powerful tools at their disposal to tackle these challenges effectively.?

In this blog, we embark on a journey to explore the nuances of batch and real-time/stream processing, beginning with a clear differentiation between the two paradigms. We unravel the complexities of batch processing, where data is processed in fixed intervals, and contrast it with the dynamic nature of real-time processing, where data is analyzed as it streams in, enabling near-instantaneous insights.

You will need the following prerequisites:

  • An active Azure subscription with a provisioned Databricks resource and authorization to create Azure storage accounts.
  • Access to Azure Active Directory (Azure AD) and permissions to generate service principal applications.
  • Databricks notebooks and a functioning Spark cluster to execute the code samples and demonstrations.
  • Adequate privileges to assign roles within Azure resources for necessary configurations and access control.

You can find code samples and additional resources for the chapter in the following GitHub repository:?

https://github.com/PacktPublishing/Optimizing-Databricks-Workload/tree/main/Chapter02.

Mounting Azure Data Lake in Databricks

Azure Data Lake is a powerful service provided by Microsoft for storing and analyzing big data of various forms. In Azure Databricks, you can mount Azure Data Lake Storage (ADLS) to seamlessly access and manipulate your data.

Creating an Azure Data Lake?instance

To create an Azure Data Lake instance, follow these steps:

  1. Go to Azure Portal: Sign in to your Azure account and navigate to the Azure Portal.
  2. Create a Storage Account: Click on “Create a resource”, and then “Storage account” in the marketplace.
  3. Configure Settings: Set the resource group, storage account name, region, performance, and redundancy.
  4. Enable Hierarchical Namespace: Check the box next to “Enable hierarchical namespace” for better data organization.
  5. Review and Create: Review your settings and click on “Create” to create the Data Lake instance.

Accessing Azure Data Lake in Databricks

Now that you have created your Azure Data Lake instance, you need to access it securely from Azure Databricks using OAuth 2.0 with an Azure AD service principal application.

Follow these steps:

  1. Create a Container: Go to your Data Lake instance, click on “Containers”, and create a new container with suitable permissions.
  2. Register an Application in Azure AD: Go to Azure Active Directory, click on “App registrations”, and register a new application.
  3. Note Down Application Details: Note down the Application (client) ID, and Directory (tenant) ID, and generate a client secret for later use.
  4. Assign Role in Storage Account: Assign the “Storage Blob Data Contributor” role to the service principal application in your storage account.

Mounting Data Lake in Databricks

Now, let’s mount the Azure Data Lake in Databricks using Python code:

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "<application-id>",
  "fs.azure.account.oauth2.client.secret": "<application-client-secret-value>",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"
}
dbutils.fs.mount(
  source="abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
  mount_point="/mnt/<mount-name>",
  extra_configs=configs
)        

Replace the placeholders with the appropriate values obtained earlier.

This code snippet mounts the Data Lake to a specified mount point in Databricks, enabling seamless access to your data.

Differentiating Batch Versus Real-Time Processing

Here are the differences between batch processing and real-time processing based on the provided aspects:

  1. Processing Speed:

  • Batch Processing: Typically slower due to processing data in fixed intervals.
  • Real-Time Processing: Near real-time or real-time processing, allowing for immediate analysis and response as data arrives.

2. Data Arrival:

  • Batch Processing: Data is processed in fixed intervals, regardless of when it was generated or arrived.
  • Real-Time Processing: Data is processed as soon as it arrives, allowing for continuous analysis of streaming data.

3. Use Cases:

  • Batch Processing: Well-suited for ETL (Extract, Transform, Load) jobs and periodic analytics that can tolerate delays in processing.
  • Real-Time Processing: Ideal for real-time analytics, such as monitoring systems, detecting anomalies, and processing IoT (Internet of Things) data streams.

4. Latency:

  • Batch Processing: Typically involves higher latency as data is processed in fixed intervals, leading to delays in insights and responses.
  • Real-Time Processing: Offers lower latency or near real-time processing, enabling rapid insights and immediate actions on streaming data.

Understanding these differences is crucial for designing and implementing effective data processing pipelines in various scenarios, allowing organizations to choose the appropriate processing model based on their specific requirements and objectives.

Working with Batch Processing


We will now learn the techniques of writing, transforming and reading content. This is the same as the scenario followed in any ETL code/script.?

Sorting Data

Sorting data in Azure Databricks can be achieved using the orderBy() method, which sorts the DataFrame based on one or more columns. Here's how you can sort data in Databricks:

# Sort data by a single column in ascending order
sorted_data = df.orderBy("column_name")        

Replace "column_name" with the actual column name, you want to sort by. You can specify ascending or descending order by using?.asc() or?.desc() respectively.

Filtering Data

Filtering data in Azure Databricks allows you to select specific rows based on certain conditions. You can use the filter() or where() methods to apply filtering. Here's how:

# Filter data based on a condition
filtered_data = df.filter(df["column_name"] > 100)        

Replace "column_name" and the condition with your desired filtering criteria.

Grouping Data

Grouping data in Azure Databricks is useful for performing aggregate operations on subsets of data. You can use the groupBy() method to group data based on one or more columns, and then apply aggregate functions like count(), sum(), avg(), etc.

# Group data by a single column and perform aggregation
grouped_data = df.groupBy("column_name").agg({"other_column": "sum"})        

In the above examples, "column_name" represent the column(s) you want to group by and agg() specify the aggregation function(s) you want to apply.

Joins

Joining data in Azure Databricks combines rows from two or more DataFrames based on a related column between them. You can perform different types of joins such as inner join, outer join, left join, and right join.?

# Perform an inner join
joined_data = df1.join(df2, df1["common_column"] == df2["common_column"], "inner")        

Replace "common_column" with the column that you want to use for joining the DataFrames.

These are some of the essential data processing operations you can perform in Azure Databricks using PySpark. They provide powerful capabilities for manipulating and analyzing large datasets efficiently.

Limiting Output?Rows

Limiting the number of output rows in PySpark can be achieved using the limit() method. This method limits the number of rows returned by the DataFrame.

# Limit the output to 10 rows
limited_data = df.limit(10)        

This code snippet limits the output to the first 10 rows of the DataFrame df.

Renaming a?Column

Renaming a column in PySpark is done using the withColumnRenamed() method. This method renames the specified column to a new name.

# Rename a column
renamed_data = df.withColumnRenamed("old_column_name", "new_column_name")        

Replace "old_column_name" the existing column name "new_column_name" with the desired new name.

Adding/Selecting Columns

Adding or selecting columns in PySpark involves using the withColumn() method. This method adds a new column to the DataFrame or replaces an existing one.?

# Add a new column
new_data = df.withColumn("new_column", df["existing_column"] * 2)
# Select columns
selected_data = df.select("column1", "column2")        

In the first example, a new column named "new_column" is added by performing a transformation on an existing column. In the second example, only specific columns "column1" and "column2" are selected from the DataFrame.

Printing Schema

Printing the schema of a DataFrame in PySpark is done using the printSchema() method. This method prints the schema of the DataFrame, including data types and nullable properties.

# Print the schema of the DataFrame
df.printSchema()        

This code snippet prints the schema of the DataFrame df to the console.

These operations provide powerful capabilities for data manipulation and analysis in Azure Databricks using PySpark, allowing you to efficiently handle large datasets and derive meaningful insights.

Working with Structured Streaming


Here, let us see some important concepts of stream processing with pseudocode:?

Triggers:

This code sets a trigger to process data every 5 seconds in the streaming job.

# Fixed interval trigger 
stream_query = 
(stream_read 
.writeStream 
.trigger(processingTime="5 seconds") 
.format("console") 
.start())        

Managing Streams:

This code snippet demonstrates various methods to manage streaming jobs such as stopping, getting IDs, explaining, and getting progress.

# Stop a running streaming job
stream_query.stop()

# Get unique identifier of the running stream
stream_id = stream_query.id

# Get unique identifier of the run of the stream
stream_run_id = stream_query.runId

# Get the name of the stream
stream_name = stream_query.name

# Explain the physical query plan of the streaming job
stream_query.explain()

# Get array of most recent progress updates of the streaming query
recent_progress = stream_query.recentProgress

# Get the last progress of the streaming query
last_progress = stream_query.lastProgress        

Sorting Data (Not directly supported in streaming DataFrame):

The provided code showcases that sorting in a streaming DataFrame is not supported directly. Sorting is achieved after aggregation and using complete output mode.

# Sorting is not directly supported on streaming DataFrames
# You can sort after aggregating and using complete output mode
sorted_stream = (stream_read
                 .groupBy("time")
                 .count()
                 .sort("time"))        

Workloads:

This code segment describes how to configure Databricks jobs to run and manage multiple streams in parallel for production environments.

# Configure Databricks job to run one or more streams in parallel
# This can be done through the Databricks UI by creating a job and scheduling it
# The job can run a notebook containing streaming code
# Databricks automatically restarts streams in case of failure        

Reading Data:

This code defines a schema and reads streaming data from a specified location into a DataFrame.

stream_read = (spark
 .readStream
 .format("json")
 .schema(schema)
 .load("dbfs:/databricks-datasets/structured-streaming/events/")
)        

Writing Data:

The code writes the streaming DataFrame to Delta Lake while specifying a checkpoint location for fault tolerance.

(stream_read
 .writeStream
 .format("delta")
 .option("checkpointLocation", "dbfs:/mnt/<mount_point>/write_stream_checkpointing")
 .start("dbfs:/mnt/<mount_point>/write_stream"))        

Streaming Jobs?Modes:

This code snippet demonstrates different output modes for writing streaming data?—?append, complete, and update.

# Append mode
stream_write = (stream_read
                .writeStream
                .outputMode("append")
                .format("console")
                .start())

# Complete mode
stream_write = (stream_read
                .writeStream
                .outputMode("complete")
                .format("console")
                .start())

# Update mode
stream_write = (stream_read
                .writeStream
                .outputMode("update")
                .format("console")
                .start())        

Checkpoints:

The provided code illustrates how checkpointing enables fault tolerance by recording the state of the stream for resuming from failures.

# Checkpointing helps in case of failure to resume from where it left off
# Specify checkpoint location when writing the stream
checkpoint_location = "dbfs:/mnt/<mount_point>/write_stream_checkpointing"

(stream_read
 .writeStream
 .format("delta")
 .option("checkpointLocation", checkpoint_location)
 .start("dbfs:/mnt/<mount_point>/write_stream"))        

Example of Structured Streaming:

This code defines a schema, reads streaming data, and writes it to Delta Lake, showcasing a basic example of Structured Streaming.

These explanations provide context and purpose for each code snippet in the context of Spark Structured Streaming.

# Define schema for reading streaming
schema = "time STRING, action STRING"

# Create a streaming DataFrame
stream_read = (spark
 .readStream
 .format("json")
 .schema(schema)
 .option("maxFilesPerTrigger", 1)
 .load("dbfs:/databricks-datasets/structured-streaming/events/")
)

# Write the streaming DataFrame to Delta Lake
(stream_read
 .writeStream
 .format("delta")
 .option("checkpointLocation", "dbfs:/mnt/<mount_point>/write_stream_checkpointing")
 .start("dbfs:/mnt/<mount_point>/write_stream"))        

Conclusion

In this blog, we covered batch and stream processing, exploring their differences and how to set up Azure storage on Databricks. We delved into batch processing, and Spark transformations, and even saw a practical batch ETL process. Additionally, we introduced Spark Structured Streaming, a tool for real-time data processing, which is crucial for applications like real-time dashboards. In the next blog, we’ll dive into machine learning and graph processing in Databricks, with plenty of examples to facilitate learning.

BASAVA PRABHU

DataEngineer|SQL|SSIS|Databricks @Capgemini UK PLC

8 个月

Krishna Yogi Kolluru Thanks for sharing this article on batch and real-time processing with Databricks!

CHESTER SWANSON SR.

Next Trend Realty LLC./wwwHar.com/Chester-Swanson/agent_cbswan

8 个月

Thanks for Sharing.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了