Ensuring Data Quality in Databricks with Great Expectations: A Practical How-to Guide
In today's data-driven world, data quality is an integral part of every data platform. Ensuring the accuracy, reliability and consistency of data you work with is crucial for making informed decisions and driving business success. One of the most extensive frameworks for data quality is Great Expectations, a powerful and flexible tool that allows you to define, test, and monitor the quality of your data.
As Databricks Lakehouse continues to play an ever larger role in modern data platforms, it becomes increasingly important to integrate data quality checks seamlessly into your Databricks workflows. In this article, we will dive into the practical aspects of integrating Great Expectations with Databricks, providing a comprehensive how-to guide complete with Python code snippets that can be directly executed as Databricks notebooks.
By following this guide, you'll be well-equipped to harness the power of Great Expectations within your Databricks environment, ensuring that your data lakehouse is built on a solid foundation of high-quality data. So, let's dive in and explore how to effectively combine these cutting-edge technologies for optimal data quality management.
Installing Dependencies
Before we can begin working with Great Expectations, it's essential to install the necessary dependencies. You can do this by running the following code in your Databricks notebook:
%pip install great-expectations
Now you should be able to import Great Expectations as follows:
import great_expectations as gx
Working with Data Context
A crucial aspect of using Great Expectations within Databricks is providing a data context. The data context serves as a central location to store Great Expectations objects, such as Expectations, checkpoints, profilers, and documentation. This storage enables you to manage, access, and share these objects across your data platform. Data contexts can persist data to various backends, including local filesystems, databases, or cloud provider storage solutions like Amazon S3.
In the following code example, we'll demonstrate how to set up a data context using the Databricks File System (DBFS) with a local filesystem backend. This configuration ensures that all Great Expectations objects are persisted to DBFS and can be easily shared across different Databricks jobs:
def init_data_context(root_dir: str) -> gx.DataContext:
'''
Initializes the Great Expectations data context using a file system backend.
:param root_dir: Root path, where Great Expectations' objects are persisted
:return: Data context
'''
# Define config
data_context_config = gx.data_context.types.base.DataContextConfig(
store_backend_defaults=gx.data_context.types.base.FilesystemStoreBackendDefaults(
root_directory=root_dir
)
)
# Init context
context = gx.get_context(project_config=data_context_config)
return context
context = init_data_context(root_dir="/dbfs/great_expectations/")
Creating a Data Source
After creating a data context, the next step in integrating Great Expectations with Databricks is to provide a datasource. Datasources in Great Expectations offer a unified API that allows you to interact with various data backends consistently, regardless of whether you're working with PostgreSQL, CSV file systems, Spark dataframes, or any other supported data backend.
A datasource is composed of two key components: the execution engine and the data connector. The execution engine is responsible for analyzing and validating the data. In our case, we'll use the SparkDFExecutionEngine, which employs the Spark engine to perform these tasks.
The data connector, on the other hand, contains the configuration needed to connect to a specific type of external data source and to access and inspect the data within it. In this particular case, we won't need to focus on the data connector, so we'll use the most basic option available, the RuntimeDataConnector. Every data connector defines batch identifiers. Those will be specified for every batch request to identify the batch.
The following code snippet demonstrates how to create a Spark datasource named spark_data_source for use with Great Expectations:
def register_spark_data_source(context: gx.DataContext):
'''
Registers a Spark data source used to process Spark data frames with Great Expectations.
The data source is available as 'spark_data_source' and defines a connector 'spark_data_source_connector'.
It defines a 'timestamp' batch identifier.
:param context: Data context
'''
source = gx.datasource.Datasource(
name="spark_data_source",
execution_engine={
"module_name": "great_expectations.execution_engine",
"class_name": "SparkDFExecutionEngine"
},
data_connectors={
f"spark_data_source_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["timestamp"]
}
}
)
context.add_or_update_datasource(datasource=source)
register_spark_data_source(context)
Creating a Runtime Batch Request
With the datasource in place, the next step in utilizing Great Expectations within Databricks is to create a batch request. A batch request is used to load a specific batch of data for analysis and validation. Batch identifiers, which we specified earlier in the datasource configuration, contain information that helps identify particular batches, such as timestamps or offsets.
In the following example, we demonstrate how to create a batch request using a Spark dataframe, which is cached to improve processing performance later on. For simplicity, we make use of the New York taxi trip dataset, which is automatically available in every Databricks deployment.
import pyspark as ps
def create_batch_request(df: ps.sql.DataFrame, df_name: str, timestamp: str) -> gx.core.batch.RuntimeBatchRequest:
'''
Creates a batch request from a Spark data frame.
:param df: Spark data frame
:param df_name: Name of the spark data frame e.g. table name
:param timestamp: Timestamp string to be passed as a batch identifier
:return: Batch request
'''
runtime_batch_request = gx.core.batch.RuntimeBatchRequest(
datasource_name="spark_data_source",
data_connector_name="spark_data_source_connector",
data_asset_name="spark_batch_{}_{}".format(df_name, timestamp),
runtime_parameters={"batch_data": df},
batch_identifiers={
"timestamp": timestamp,
}
)
return runtime_batch_request
def current_timestamp() -> str:
'''
Returns the current timestamp.
:return: Timestamp string in ISO 8601 format
'''
import datetime
return datetime.datetime.utcnow().isoformat()
# Create Spark app
spark = ps.sql.SparkSession.builder.appName("myc_taxi_great_expectations_checks").getOrCreate()
# Cache table for faster runtime
#
# NOTE
# If there is a timestamp column, you may add a filter for a specific time frame.
# This allows to rerun the checks later using the timestamp from the batch.
#
table = "samples.nyctaxi.trips"
spark.sql(f"CACHE TABLE data AS SELECT * FROM {table}")
# Get data frame
df = spark.sql("SELECT * FROM data")
# Create batch request
timestamp = current_timestamp()
batch_request = create_batch_request(df=df, df_name=table, timestamp=timestamp)
Building an Expectation Suite
Before we can validate a data batch using Great Expectations, we need to create expectations that define the desired characteristics of our data. Expectations are combined into an expectation suite, which serves as an overall description of the data and its properties.
There are two primary methods for creating expectations:
In the following sections, we will explore both methods, providing you with the knowledge and tools necessary to create expectations tailored to your specific data quality needs.
Using a profiler
A profiler is a powerful tool that can analyze your sample data and automatically generate a set of expectations based on its characteristics. This method is particularly useful when you're just starting out with Great Expectations or when you want to quickly establish a baseline set of expectations for your data.
In the following example, we generate expectations from the batch request created in the last section and plot the resulting expectations and metrics:
领英推荐
# Run the default onboarding profiler on the batch request
result = context.assistants.onboarding.run(
batch_request=batch_request,
exclude_column_names=[],
)
# Get the suite with specific name
suite_name = "nyc_taxi_expectations_v1"
suite = result.get_expectation_suite(
expectation_suite_name=suite_name
)
# Plot results
result.plot_expectations_and_metrics()
Manually adding expectations
If you prefer a more hands-on approach or have specific requirements that cannot be captured automatically, you can also create expectations manually. This method allows you to define your expectations with a greater level of detail and control, ensuring that your data quality checks are tailored to your unique needs. You will find an overview of all available expectations in the Expectations Galery. Additionally, you can create your own expectations (out-of-scope for this post).
In the following example, we are adding an expectation to our profiler-generated expection suite, which checks if the dropoff-time is later than the pickup-time in our New York taxi trip example:
# Create expectation config
config = gx.core.ExpectationConfiguration(
expectation_type="expect_column_pair_values_a_to_be_greater_than_b",
kwargs={
"column_A": "tpep_dropoff_datetime",
"column_B": "tpep_pickup_datetime"
},
meta={
"notes": {
"format": "markdown",
"content": "Dropoff time must be *larger than* pickup time "
}
}
)
# Add expectation to suite
suite.add_expectation(expectation_configuration=config)
Saving an expectation suite
Once you've created an expectation suite, you can easily persist it to the data context using the following method:
# Perist expectation suite with the specified suite name from above (suite_name="nyc_taxi_expectations_v1")
context.add_or_update_expectation_suite(expectation_suite=suite)
Perform Validations using Expectation Suites
Having created an expectation suite, we can now move on to defining and running checkpoints. Checkpoints are the primary means for validating data in a production deployment of Great Expectations. They offer a convenient abstraction for bundling the validation of one or more batches of data against one or several expectation suites, as well as specifying the actions to be taken after the validation process.
Checkpoints can be enhanced with various actions, such as alerts, allowing you to receive notifications (e.g., via email or Slack) if validation fails. This feature ensures that you stay informed about the quality of your data and can quickly respond to any issues that may arise, maintaining the reliability and accuracy of your data platform.
Creating checkpoints
The following code demonstrates how to create a checkpoint in Great Expectations:
checkpoint_name="nyc_taxi_test_checkpoint"
# Create and persist checkpoint to reuse for multiple batches
context.add_or_update_checkpoint(
name = checkpoint_name,
config_version = 1,
class_name = "SimpleCheckpoint",
validations = [
{"expectation_suite_name": suite_name}
]
)
By persisting the expectation suite, the datasource and the checkpoints to DBFS, you eliminate the need to repeat this process every time you want to validate your data. With the data context, you can define expectations and checkpoints for your data just once, and then schedule a Databricks notebook to regularly validate the data.
Running checkpoints
In the following code example, we demonstrate how to execute a checkpoint from the data context in Great Expectations. By incorporating the creation of a batch request and running a checkpoint within a separate notebook, you can easily schedule regular data quality checks to ensure the ongoing accuracy and reliability of your data.
# Run checkpoint
checkpoint_result = context.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request=batch_request
)
Visualizing results
Running a checkpoint in Great Expectations generates data docs by default. These data docs are HTML documents that provide a visual representation of the validation results, making it easier to understand and interpret the outcome of your data quality checks.
In Databricks, you can view these data docs using the displayHTML call, which allows you to conveniently access and analyze your validation results directly within your Databricks environment.
The following code example demonstrates how to visualize the checkpoint result from the previous section:
def display_checkpoint_results(context: gx.DataContext, result: gx.checkpoint.types.checkpoint_result.CheckpointResult):
'''
Displays the docs generated by performing a checkpoint in a Databricks notebook.
:param context: Data context
:param result: Checkpoint result
'''
result_ids = result.list_validation_result_identifiers()
# Iterate validations results
for result_id in result_ids:
docs = context.get_docs_sites_urls(resource_identifier=result_id)
# Iterate and display docs
for doc in docs:
path = doc["site_url"]
# Remove the file:// prefix from the URL
if path.startswith("file://"):
path = path[len("file://"):]
# Display the HTML in the Databricks notebook
with open(path, "r") as f:
displayHTML(f.read())
display_checkpoint_results(context, checkpoint_result)
Conclusion
In this blog post, we've explored the process of integrating Great Expectations with Databricks to perform quality checks on your data. By combining the powerful features of the Great Expectations framework with the flexibility and scalability of Databricks, you can establish a robust data quality management system that ensures the ongoing accuracy, reliability, and trustworthiness of your data.
Throughout the post, we've provided step-by-step guidance and code snippets that demonstrate how to set up a data context, create datasources, define expectations, and run checkpoints within Databricks. Additionally, we've shown how to visualize validation results using data docs, enabling you to gain valuable insights into the quality of your data.
By following the steps outlined in this post, you'll be well-equipped to build a comprehensive data quality management system that supports your organization's data-driven goals. With consistent data quality checks in place, you can confidently rely on your data to make informed decisions, drive innovation, and maintain a competitive edge in today's data-driven world.
Further Resources