Understanding Batch and Real-Time Processing in DataBricks
Krishna Yogi Kolluru
Data Science Architect | ML | GenAI | Speaker | ex-Microsoft | ex- Credit Suisse | IIT - NUS Alumni | AWS & Databricks Certified Data Engineer | T2 Skilled worker
Synopsis
Introduction
In today’s data-driven landscape, businesses face the challenge of efficiently processing and analyzing large volumes of data in both batch and real-time scenarios. With the evolution of technologies like Azure Databricks and Apache Spark?, data engineers and analysts have powerful tools at their disposal to tackle these challenges effectively.?
In this blog, we embark on a journey to explore the nuances of batch and real-time/stream processing, beginning with a clear differentiation between the two paradigms. We unravel the complexities of batch processing, where data is processed in fixed intervals, and contrast it with the dynamic nature of real-time processing, where data is analyzed as it streams in, enabling near-instantaneous insights.
You will need the following prerequisites:
You can find code samples and additional resources for the chapter in the following GitHub repository:?
Mounting Azure Data Lake in Databricks
Azure Data Lake is a powerful service provided by Microsoft for storing and analyzing big data of various forms. In Azure Databricks, you can mount Azure Data Lake Storage (ADLS) to seamlessly access and manipulate your data.
Creating an Azure Data Lake?instance
To create an Azure Data Lake instance, follow these steps:
Accessing Azure Data Lake in Databricks
Now that you have created your Azure Data Lake instance, you need to access it securely from Azure Databricks using OAuth 2.0 with an Azure AD service principal application.
Follow these steps:
Mounting Data Lake in Databricks
Now, let’s mount the Azure Data Lake in Databricks using Python code:
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<application-id>",
"fs.azure.account.oauth2.client.secret": "<application-client-secret-value>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"
}
dbutils.fs.mount(
source="abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point="/mnt/<mount-name>",
extra_configs=configs
)
Replace the placeholders with the appropriate values obtained earlier.
This code snippet mounts the Data Lake to a specified mount point in Databricks, enabling seamless access to your data.
Differentiating Batch Versus Real-Time Processing
Here are the differences between batch processing and real-time processing based on the provided aspects:
2. Data Arrival:
3. Use Cases:
4. Latency:
Understanding these differences is crucial for designing and implementing effective data processing pipelines in various scenarios, allowing organizations to choose the appropriate processing model based on their specific requirements and objectives.
Working with Batch Processing
We will now learn the techniques of writing, transforming and reading content. This is the same as the scenario followed in any ETL code/script.?
Sorting Data
Sorting data in Azure Databricks can be achieved using the orderBy() method, which sorts the DataFrame based on one or more columns. Here's how you can sort data in Databricks:
# Sort data by a single column in ascending order
sorted_data = df.orderBy("column_name")
Replace "column_name" with the actual column name, you want to sort by. You can specify ascending or descending order by using?.asc() or?.desc() respectively.
Filtering Data
Filtering data in Azure Databricks allows you to select specific rows based on certain conditions. You can use the filter() or where() methods to apply filtering. Here's how:
# Filter data based on a condition
filtered_data = df.filter(df["column_name"] > 100)
Replace "column_name" and the condition with your desired filtering criteria.
Grouping Data
Grouping data in Azure Databricks is useful for performing aggregate operations on subsets of data. You can use the groupBy() method to group data based on one or more columns, and then apply aggregate functions like count(), sum(), avg(), etc.
# Group data by a single column and perform aggregation
grouped_data = df.groupBy("column_name").agg({"other_column": "sum"})
In the above examples, "column_name" represent the column(s) you want to group by and agg() specify the aggregation function(s) you want to apply.
Joins
Joining data in Azure Databricks combines rows from two or more DataFrames based on a related column between them. You can perform different types of joins such as inner join, outer join, left join, and right join.?
领英推荐
# Perform an inner join
joined_data = df1.join(df2, df1["common_column"] == df2["common_column"], "inner")
Replace "common_column" with the column that you want to use for joining the DataFrames.
These are some of the essential data processing operations you can perform in Azure Databricks using PySpark. They provide powerful capabilities for manipulating and analyzing large datasets efficiently.
Limiting Output?Rows
Limiting the number of output rows in PySpark can be achieved using the limit() method. This method limits the number of rows returned by the DataFrame.
# Limit the output to 10 rows
limited_data = df.limit(10)
This code snippet limits the output to the first 10 rows of the DataFrame df.
Renaming a?Column
Renaming a column in PySpark is done using the withColumnRenamed() method. This method renames the specified column to a new name.
# Rename a column
renamed_data = df.withColumnRenamed("old_column_name", "new_column_name")
Replace "old_column_name" the existing column name "new_column_name" with the desired new name.
Adding/Selecting Columns
Adding or selecting columns in PySpark involves using the withColumn() method. This method adds a new column to the DataFrame or replaces an existing one.?
# Add a new column
new_data = df.withColumn("new_column", df["existing_column"] * 2)
# Select columns
selected_data = df.select("column1", "column2")
In the first example, a new column named "new_column" is added by performing a transformation on an existing column. In the second example, only specific columns "column1" and "column2" are selected from the DataFrame.
Printing Schema
Printing the schema of a DataFrame in PySpark is done using the printSchema() method. This method prints the schema of the DataFrame, including data types and nullable properties.
# Print the schema of the DataFrame
df.printSchema()
This code snippet prints the schema of the DataFrame df to the console.
These operations provide powerful capabilities for data manipulation and analysis in Azure Databricks using PySpark, allowing you to efficiently handle large datasets and derive meaningful insights.
Working with Structured Streaming
Here, let us see some important concepts of stream processing with pseudocode:?
Triggers:
This code sets a trigger to process data every 5 seconds in the streaming job.
# Fixed interval trigger
stream_query =
(stream_read
.writeStream
.trigger(processingTime="5 seconds")
.format("console")
.start())
Managing Streams:
This code snippet demonstrates various methods to manage streaming jobs such as stopping, getting IDs, explaining, and getting progress.
# Stop a running streaming job
stream_query.stop()
# Get unique identifier of the running stream
stream_id = stream_query.id
# Get unique identifier of the run of the stream
stream_run_id = stream_query.runId
# Get the name of the stream
stream_name = stream_query.name
# Explain the physical query plan of the streaming job
stream_query.explain()
# Get array of most recent progress updates of the streaming query
recent_progress = stream_query.recentProgress
# Get the last progress of the streaming query
last_progress = stream_query.lastProgress
Sorting Data (Not directly supported in streaming DataFrame):
The provided code showcases that sorting in a streaming DataFrame is not supported directly. Sorting is achieved after aggregation and using complete output mode.
# Sorting is not directly supported on streaming DataFrames
# You can sort after aggregating and using complete output mode
sorted_stream = (stream_read
.groupBy("time")
.count()
.sort("time"))
Workloads:
This code segment describes how to configure Databricks jobs to run and manage multiple streams in parallel for production environments.
# Configure Databricks job to run one or more streams in parallel
# This can be done through the Databricks UI by creating a job and scheduling it
# The job can run a notebook containing streaming code
# Databricks automatically restarts streams in case of failure
Reading Data:
This code defines a schema and reads streaming data from a specified location into a DataFrame.
stream_read = (spark
.readStream
.format("json")
.schema(schema)
.load("dbfs:/databricks-datasets/structured-streaming/events/")
)
Writing Data:
The code writes the streaming DataFrame to Delta Lake while specifying a checkpoint location for fault tolerance.
(stream_read
.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/mnt/<mount_point>/write_stream_checkpointing")
.start("dbfs:/mnt/<mount_point>/write_stream"))
Streaming Jobs?Modes:
This code snippet demonstrates different output modes for writing streaming data?—?append, complete, and update.
# Append mode
stream_write = (stream_read
.writeStream
.outputMode("append")
.format("console")
.start())
# Complete mode
stream_write = (stream_read
.writeStream
.outputMode("complete")
.format("console")
.start())
# Update mode
stream_write = (stream_read
.writeStream
.outputMode("update")
.format("console")
.start())
Checkpoints:
The provided code illustrates how checkpointing enables fault tolerance by recording the state of the stream for resuming from failures.
# Checkpointing helps in case of failure to resume from where it left off
# Specify checkpoint location when writing the stream
checkpoint_location = "dbfs:/mnt/<mount_point>/write_stream_checkpointing"
(stream_read
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_location)
.start("dbfs:/mnt/<mount_point>/write_stream"))
Example of Structured Streaming:
This code defines a schema, reads streaming data, and writes it to Delta Lake, showcasing a basic example of Structured Streaming.
These explanations provide context and purpose for each code snippet in the context of Spark Structured Streaming.
# Define schema for reading streaming
schema = "time STRING, action STRING"
# Create a streaming DataFrame
stream_read = (spark
.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("dbfs:/databricks-datasets/structured-streaming/events/")
)
# Write the streaming DataFrame to Delta Lake
(stream_read
.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/mnt/<mount_point>/write_stream_checkpointing")
.start("dbfs:/mnt/<mount_point>/write_stream"))
Conclusion
In this blog, we covered batch and stream processing, exploring their differences and how to set up Azure storage on Databricks. We delved into batch processing, and Spark transformations, and even saw a practical batch ETL process. Additionally, we introduced Spark Structured Streaming, a tool for real-time data processing, which is crucial for applications like real-time dashboards. In the next blog, we’ll dive into machine learning and graph processing in Databricks, with plenty of examples to facilitate learning.
DataEngineer|SQL|SSIS|Databricks @Capgemini UK PLC
8 个月Krishna Yogi Kolluru Thanks for sharing this article on batch and real-time processing with Databricks!
Next Trend Realty LLC./wwwHar.com/Chester-Swanson/agent_cbswan
8 个月Thanks for Sharing.