Change Data Capture(CDC) using AWS Glue in Batch ETL Process.
Kushwanth Chowdary Kandala
Senior Generative AI/ML Data Scientist???? |Network Copilot ????|Multimodel Ragger| AI/ML Engg Solutions??
In AWS Glue, job bookmarks are a feature that helps track the state of data processing in your ETL (Extract, Transform, Load) jobs. Job bookmarks are not the same as CDC (Change Data Capture), but they serve a related purpose.
Here's a brief overview of both concepts:
While job bookmarks can be used to implement a form of incremental processing and avoid unnecessary reprocessing of data, they do not inherently capture or track changes in data. If you need to implement a full CDC solution, you would typically use specific CDC tools or techniques, such as comparing timestamps, change flags, or versioning.
In summary, job bookmarks in AWS Glue are more about job state management and optimizing ETL job execution, while CDC is about capturing and managing changes to data. Depending on your use case, you might use job bookmarks alongside other techniques to achieve the desired outcome, but they are not a CDC solution in themselves.
Change Data Capture (CDC) tools:
Change Data Capture (CDC) tools are designed to identify and capture changes made to data so that these changes can be tracked, logged, and replicated to other systems. Here are some popular CDC tools:
When choosing a CDC tool, consider factors such as the databases you are using, the scale of your data, the required latency, and your overall architecture. Additionally, the specific features, ease of use, and compatibility with your existing technology stack are important considerations. Always refer to the documentation of the respective tools for the most up-to-date information.
This Article Covers 3 - simple way to handle different cases of handing source data changes in the
CASE 1: This works only when you have "your_timestamp_column" last updated timestamp column and "your_key_column" is a unique identifier.
Implementing Change Data Capture (CDC) using AWS Glue involves comparing the current state of your data with a previous state to identify changes. Below is a simplified example using Python and PySpark in an AWS Glue ETL script. This script assumes you are comparing data in two tables, such as "source" and "destination," and identifies new or updated records.
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import ApplyMapping, DropFields, SelectFields
# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
# Define your source and destination tables
source_table = "your_source_table"
destination_table = "your_destination_table"
# Create DynamicFrames for source and destination data
source_dyf = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = source_table)
destination_dyf = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = destination_table)
# Identify changes using PySpark transformations
joined_dyf = source_dyf.join(
frame1 = source_dyf, keys1 = ["your_key_column"],
frame2 = destination_dyf, keys2 = ["your_key_column"],
transformation_ctx = "join"
)
changes_dyf = joined_dyf.filter(
"source_table.your_timestamp_column > destination_table.your_timestamp_column or destination_table.your_key_column is null"
)
# Apply transformations or filters as needed
mapped_dyf = ApplyMapping.apply(
frame = changes_dyf,
mappings = [
("source_table.column1", "string", "destination_table.column1"),
("source_table.column2", "string", "destination_table.column2"),
# Add more columns as needed
],
transformation_ctx = "mapping"
)
# Select only the required fields
selected_dyf = SelectFields.apply(
frame = mapped_dyf,
paths = ["destination_table.column1", "destination_table.column2", ...],
transformation_ctx = "select_fields"
)
# Drop unnecessary fields
final_dyf = DropFields.apply(
frame = selected_dyf,
paths = ["join", "mapping", "select_fields"],
transformation_ctx = "drop_fields"
)
# Write the changes to the destination
glueContext.write_dynamic_frame.from_catalog(frame = final_dyf, database = "your_database", table_name = "your_cdc_table")
# Commit the changes
job.commit()
This is a simplified example, and depending on your use case and data structure, you might need to adapt the script accordingly. Additionally, consider using a more efficient and scalable solution for large datasets, such as AWS Glue ETL jobs with dynamic dataframes.
CASE 2: This works only when you have "your_key_column" unique identifier and has no "your_timestamp_column" last updated timestamp column.
Implementing Change Data Capture (CDC) without a specific timestamp column in the source and destination tables can be more challenging, as the typical approach involves comparing the data based on some form of last modified timestamp or incremental key. Here's a simplified example using AWS Glue without relying on a timestamp column.
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import Join, DropFields
# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
# Define your source and destination tables
source_table = "your_source_table"
destination_table = "your_destination_table"
# Create DynamicFrames for source and destination data
source_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=source_table)
destination_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=destination_table)
# Perform an outer join to identify new and updated records
joined_dyf = Join.apply(source_dyf, destination_dyf, 'your_key_column', 'outer')
# Identify changes by filtering records with differences between source and destination
changes_dyf = joined_dyf.filter(
"source_table.your_key_column is not null and (source_table != destination_table or destination_table.your_key_column is null)"
)
# Drop unnecessary fields
final_dyf = DropFields.apply(
frame=changes_dyf,
paths=["source_table.your_key_column", "destination_table.your_key_column"],
transformation_ctx="drop_fields"
)
# Write the changes to the destination
glueContext.write_dynamic_frame.from_catalog(frame=final_dyf, database="your_database", table_name="your_cdc_table")
# Commit the changes
job.commit()
In this example:
Keep in mind that without a specific timestamp, detecting updates relies on changes in the data itself. If your source and destination tables are large, consider using more efficient techniques or tools for CDC, and always test the solution thoroughly to ensure it meets your requirements.
The resolveChoice transformation in AWS Glue is used to handle choices made during dynamic frame casting or resolving column types. It allows you to handle cases where column types are ambiguous or need to be explicitly defined.
In the provided example, resolveChoice is used in the context of casting the column "your_key_column" to a long type. Here's the relevant part of the code:
领英推荐
changes_dyf = changes_dyf.resolveChoice(
specs=[('source_table.your_key_column', 'cast:long')],
choice='make_cols'
)
Here's what each parameter in resolveChoice does:
In simpler terms, the resolveChoice transformation is telling AWS Glue to cast the "your_key_column" in the changes_dyf dynamic frame to a long type. This can be necessary when dealing with mixed types or when the type of a column needs to be explicitly defined for further processing.
This is just one use case of resolveChoice. Depending on your specific data and requirements, you might need to use it differently. Always refer to the AWS Glue documentation for the most accurate and detailed information on transformations and their parameters.
CASE 3: This works only when you have "your_key_column" unique identifier and has no "your_timestamp_column" last updated timestamp column using change flag.
Implementing Change Data Capture (CDC) with upserts (inserts and updates) without a specific timestamp column in the source and destination tables is more complex. In this scenario, you might need to rely on some form of change tracking, such as a change flag or a versioning mechanism. Below is a simplified example using AWS Glue.
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import Join, DropFields, SelectFields
# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
# Define your source and destination tables
source_table = "your_source_table"
destination_table = "your_destination_table"
# Create DynamicFrames for source and destination data
source_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=source_table)
destination_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=destination_table)
# Perform an outer join to identify new and updated records
joined_dyf = Join.apply(source_dyf, destination_dyf, 'your_key_column', 'outer')
# Identify changes by filtering records with differences between source and destination
changes_dyf = joined_dyf.filter(
"source_table.your_key_column is not null and (source_table != destination_table or destination_table.your_key_column is null)"
)
# Add a change flag to indicate insert (I) or update (U)
changes_dyf = changes_dyf.resolveChoice(
specs=[('source_table.your_key_column', 'cast:long')],
choice='make_cols'
)
# Filter only the necessary columns for upsert
upsert_dyf = SelectFields.apply(
frame=changes_dyf,
paths=["source_table.column1", "source_table.column2", ...],
transformation_ctx="select_fields"
)
# Write the changes to the destination
glueContext.write_dynamic_frame.from_catalog(frame=upsert_dyf, database="your_database", table_name="your_cdc_table")
# Commit the changes
job.commit()
In this example:
CASE 4: This is when you don't have unique key or no last updated timestamp
Handling Change Data Capture (CDC) and upserts without a timestamp column or a unique identifier (key) in the source and destination tables can be challenging. In such cases, you might need to rely on other mechanisms, such as hashing the entire record or using a combination of fields to identify changes. Keep in mind that these methods might not be as reliable as having a dedicated timestamp column or a unique key.
Here's a simplified example using AWS Glue where we use a hash of the entire record to identify changes.
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import Join, DropFields, SelectFields, ApplyMapping
# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
# Define your source and destination tables
source_table = "your_source_table"
destination_table = "your_destination_table"
# Create DynamicFrames for source and destination data
source_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=source_table)
destination_dyf = glueContext.create_dynamic_frame.from_catalog(database="your_database", table_name=destination_table)
# Perform a full outer join to identify new and updated records
joined_dyf = Join.apply(source_dyf, destination_dyf, 'and'.join(source_dyf.schema().names()), 'outer')
# Identify changes by filtering records with differences between source and destination
changes_dyf = joined_dyf.filter(
"source_table is not null and (source_table != destination_table or destination_table is null)"
)
# Apply a mapping to create a hash of the entire record
hash_mapping = [
("source_table.*", "string", "record_hash")
]
changes_dyf = ApplyMapping.apply(
frame=changes_dyf,
mappings=hash_mapping,
transformation_ctx="hash_mapping"
)
# Drop unnecessary fields
final_dyf = DropFields.apply(
frame=changes_dyf,
paths=["source_table"],
transformation_ctx="drop_fields"
)
# Write the changes to the destination
glueContext.write_dynamic_frame.from_catalog(frame=final_dyf, database="your_database", table_name="your_cdc_table")
# Commit the changes
job.commit()
In this example:
Keep in mind that this is a workaround and might not be suitable for all scenarios. If possible, consider modifying your data model to include a timestamp or a unique identifier to make change tracking more robust. Always thoroughly test any CDC implementation to ensure it meets your requirements.
Conclusion:
As you see in the last case when there is no unique key and last updated timestamp things are getting complicated in the ETL process.
without a specific timestamp, detecting updates relies on changes in the data itself and when source and destination tables are large, consider using more efficient techniques or tools for CDC.