Building an End-to-End Airflow Data Pipeline with BigQuery, dbt & Soda
Krishna Yogi Kolluru
Data Science Architect | ML | GenAI | Speaker | ex-Microsoft | ex- Credit Suisse | IIT - NUS Alumni | AWS & Databricks Certified Data Engineer | T2 Skilled worker
Dataset
The dataset for this retail project can be found at this Kaggle link.
Column Description
Pipeline
Prerequisites
Steps
IMPORTANT: Please ensure that the Dockerfile is configured to use quay.io/astronomer/astro-runtime:8.8.0 in the Dockerfile (or use Airflow 2.6.1). If not, please update to that version and restart Airflow (execute astro dev restart with the Astro CLI)
{
"type": "service_account",
"project_id": "your_project_id_here",
"private_key_id": "58cfee8a937e7bfc66ae6465a848db53cf4fb919",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADA... (truncated for brevity) ...Ehq3KtZ\n-----END PRIVATE KEY-----\n",
"client_email": "airflow-online-retail@your_project_id_here.iam.gserviceaccount.com",
"client_id": "117189738213106346790",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/airflow-online-retail%40your_project_id_here.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
Navigate to Airflow → Admin → Connections
Proceed to create the DAG.
from airflow.decorators import dag, task
from datetime import datetime
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
@dag(
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['retail'],
)
def retail_dag():
upload_csv_to_gcs_task = LocalFilesystemToGCSOperator(
task_id='upload_csv_to_gcs',
src='your_local_file_path_here',
dst='your_destination_path_in_GCS_here',
bucket='your_bucket_name_here',
gcp_conn_id='gcp',
mime_type='text/csv',
)
retail_dag()
Make sure to replace retail_dag with the name of your DAG if it’s different. This will execute the upload_csv_to_gcs task of your DAG for the specified date (2023–01–01 in this case).
To test the task, you can run the following commands
astro dev bash
airflow tasks test retail_dag upload_csv_to_gcs 2023-01-01
Create an empty Dataset
from airflow.decorators import dag, task
from datetime import datetime
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['retail'],
)
def retail_dag():
create_retail_dataset_task = BigQueryCreateEmptyDatasetOperator(
task_id='create_retail_dataset',
dataset_id='retail',
gcp_conn_id='gcp',
)
retail_dag()
Create an airflow task to load the provided file into a BigQuery table named raw_invoices.
from airflow.decorators import dag, task
from datetime import datetime
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata
from astro.constants import FileType
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['retail'],
)
def retail_dag():
gcs_to_raw_task = aql.load_file(
task_id='gcs_to_raw',
input_file=File(
'gs://marclamberti_online_retail/raw/online_retail.csv',
conn_id='gcp',
filetype=FileType.CSV,
),
output_table=Table(
name='raw_invoices',
conn_id='gcp',
metadata=Metadata(schema='retail')
),
use_native_support=False,
)
retail_dag()
Data has been successfully loaded into the warehouse!
Add to requirements.txt
soda-core-bigquery==3.0.45
Generate a configuration.yml
# include/soda/configuration.yml
data_source retail:
type: bigquery
connection:
account_info_json_path: /usr/local/airflow/include/gcp/service_account.json
auth_scopes:
- https://www.googleapis.com/auth/bigquery
- https://www.googleapis.com/auth/cloud-platform
- https://www.googleapis.com/auth/drive
project_id: 'your_project_id_here'
dataset: retail
soda_cloud:
host: cloud.soda.io
api_key_id: 'your_api_key_id_here'
api_key_secret: 'your_api_key_secret_here'
Locate the project_id in GCP
Sign up for a Soda account
Create an API → Profile → API Key?s → Generate API Key → Copy the API key
Test the connection
astro dev bash
soda test-connection -d retail -c include/soda/configuration.yml
Create the first test
# include/soda/checks/sources/raw_invoices.yml
checks for raw_invoices:
- schema:
fail:
when required column missing: [Invoice_No, Stock_Code, Quantity, Invoice_Date, UnitPrice, Customer_ID, Country]
when wrong column type:
InvoiceNo: string
StockCode: string
Quantity: integer
InvoiceDate: string
UnitPrice: float64
CustomerID: float64
Country: string
Perform the quality check:
soda scan -d retail -c include/soda/configuration.yml
include/soda/checks/sources/raw_invoices.yml
Create the check function
def check(scan_name, checks_subpath=None, data_source_name='retail', project_root='include'):
from soda.scan import Scan
print('Running Soda Scan ...')
config_file_path = f'{project_root}/soda/configuration.yml'
checks_path = f'{project_root}/soda/checks'
if checks_subpath:
checks_path += f'/{checks_subpath}'
scan_instance = Scan()
scan_instance.set_verbose()
scan_instance.add_configuration_yaml_file(config_file_path)
scan_instance.set_data_source_name(data_source_name)
scan_instance.add_sodacl_yaml_files(checks_path)
scan_instance.set_scan_definition_name(scan_name)
result = scan_instance.execute()
print(scan_instance.get_logs_text())
if result != 0:
raise ValueError('Soda Scan failed')
return result
Create the Python virtual environment and install Soda:
RUN python -m venv soda_venv && \
. soda_venv/bin/activate && \
pip install --no-cache-dir soda-core-bigquery==3.0.45 && \
pip install --no-cache-dir soda-core-scientific==3.0.45 && \
deactivate
Within the DAG, add a new task:
@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_load(scan_name='check_load', checks_subpath='sources'):
from include.soda.check_function import check
return check(scan_name, checks_subpath)
ExternalPython utilizes an existing Python virtual environment with pre-installed dependencies, making it faster to run compared to VirtualPython, where dependencies are installed with each run.
Verify the task
astro dev bash
airflow tasks test retail check_load 2023-01-01
Transform
Install Cosmos — DBT
In requirements.txt:
astronomer-cosmos[dbt-bigquery]==1.0.3 // Installs Google Cloud Platform, Cosmos, and DBT
protobuf==3.20.0
In the environment configuration (env):
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
In the Dockerfile:
RUN python -m venv dbt_venv && \
. dbt_venv/bin/activate && \
pip install --no-cache-dir dbt-bigquery==1.5.3 && \
deactivate
Restart the development environment:
astro dev restart
Establish include/dbt directory structure:
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
# dbt_project.yml
name: 'retail'
profile: 'retail'
models:
retail:
materialized: table
# profiles.yml
retail:
target: dev
outputs:
dev:
type: bigquery
method: service-account
keyfile: /usr/local/airflow/include/gcp/service_account.json
project: airtube-390719
dataset: retail
threads: 1
timeout_seconds: 300
location: US
领英推荐
Navigate to BigQuery
Execute the following SQL query
CREATE TABLE IF NOT EXISTS `retail.country` (
`id` INT NOT NULL,
`iso` STRING NOT NULL,
`name` STRING NOT NULL,
`nicename` STRING NOT NULL,
`iso3` STRING DEFAULT NULL,
`numcode` INT DEFAULT NULL,
`phonecode` INT NOT NULL
);
— Inserting data into retail.country table
INSERT INTO `retail.country` (`id`, `name`, `nicename`, `iso3`, `numcode`, `phonecode`) VALUES
(1, 'AFGHANISTAN', 'Afghanistan', 'AFG', 4, 93),
(2, 'ALBANIA', 'Albania', 'ALB', 8, 355);
(238, 'ZAMBIA', 'Zambia', 'ZMB', 894, 260),
(239, 'ZIMBABWE', 'Zimbabwe', 'ZWE', 716, 263);
In models/sources/sources.yml:
version: 2
sources:
- name: retail
database: your_project_id_here # Replace with the actual project id!
tables:
- name: raw_invoices
- name: country
In models/transform/dim_customer.sql:
- dim_customer.sql
- Create the dimension table
WITH customer_cte AS (
SELECT DISTINCT
{{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
Country AS country
FROM {{ source('retail', 'raw_invoices') }}
WHERE CustomerID IS NOT NULL
)
SELECT
t.*,
cm.iso
FROM customer_cte t
LEFT JOIN {{ source('retail', 'country') }} cm ON t.country = cm.nicename;
Replace the placeholder your_project_id_here with the actual project ID.
- dim_datetime.sql
- Create a Common Table Expression (CTE) to Extract Date and Time Components.
WITH datetime_cte AS (
SELECT DISTINCT
InvoiceDate AS datetime_id,
CASE
WHEN LENGTH(InvoiceDate) = 16 THEN
- Date format: "DD/MM/YYYY HH:MM"
PARSE_DATETIME('%m/%d/%Y %H:%M', InvoiceDate)
WHEN LENGTH(InvoiceDate) <= 14 THEN
- Date format: "MM/DD/YY HH:MM"
PARSE_DATETIME('%m/%d/%y %H:%M', InvoiceDate)
ELSE
NULL
END AS date_part
FROM {{ source('retail', 'raw_invoices') }}
WHERE InvoiceDate IS NOT NULL
)
SELECT
datetime_id,
date_part as datetime,
EXTRACT(YEAR FROM date_part) AS year,
EXTRACT(MONTH FROM date_part) AS month,
EXTRACT(DAY FROM date_part) AS day,
EXTRACT(HOUR FROM date_part) AS hour,
EXTRACT(MINUTE FROM date_part) AS minute,
EXTRACT(DAYOFWEEK FROM date_part) AS weekday
FROM datetime_cte;
- dim_product.sql
- StockCode isn't unique, a product with the same id can have different prices
- Create the dimension table
SELECT DISTINCT
{{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
StockCode AS stock_code,
Description AS description,
UnitPrice AS price
FROM {{ source('retail', 'raw_invoices') }}
WHERE StockCode IS NOT NULL
AND UnitPrice > 0;
- fct_invoices.sql
- Create the fact table by joining the relevant keys from the dimension table
WITH fct_invoices_cte AS (
SELECT
InvoiceNo AS invoice_id,
InvoiceDate AS datetime_id,
{{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
{{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
Quantity AS quantity,
Quantity * UnitPrice AS total
FROM {{ source('retail', 'raw_invoices') }}
WHERE Quantity > 0
)
SELECT
invoice_id,
dt.datetime_id,
dp.product_id,
dc.customer_id,
quantity,
total
FROM fct_invoices_cte fi
INNER JOIN {{ ref('dim_datetime') }} dt ON fi.datetime_id = dt.datetime_id
INNER JOIN {{ ref('dim_product') }} dp ON fi.product_id = dp.product_id
INNER JOIN {{ ref('dim_customer') }} dc ON fi.customer_id = dc.customer_id;
Execute the models
astro dev bash
source /usr/local/airflow/dbt_venv/bin/activate
cd include/dbt
dbt deps
dbt run - profiles-dir /usr/local/airflow/include/dbt/
Verify on BigQuery that the tables exist with data.
Incorporate the task into the workflow
from include.dbt.cosmos_config import DBT_PROJECT_CONFIG, DBT_CONFIG
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import LoadMode
from cosmos.config import ProjectConfig, RenderConfig
transform = DbtTaskGroup(
group_id='transform',
project_config=DBT_PROJECT_CONFIG,
profile_config=DBT_CONFIG,
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
select=['path:models/transform']
)
)
Create DBT_CONFIG
# include/dbt/cosmos_config.py
from cosmos.config import ProfileConfig, ProjectConfig
from pathlib import Path
DBT_CONFIG = ProfileConfig(
profile_name='retail',
target_name='dev',
profiles_yml_filepath=Path('/usr/local/airflow/include/dbt/profiles.yml')
)
DBT_PROJECT_CONFIG = ProjectConfig(
dbt_project_path='/usr/local/airflow/include/dbt/',
)
Test a task
astro dev bash
airflow tasks list retail
airflow tasks test retail transform.dim_customer.dim_customer_run 2023–01–01
airflow tasks test retail transform.dim_customer.dim_customer_test 2023–01–01
Go check out the Airflow UI, you should see a TaskGroup transform with the models.First dbt models in place!
In include/soda/checks/transform/dim_customer.yml:
checks for dim_customer:
# Check fails if any required column is missing or has a wrong type
- schema:
fail:
when required column missing:
[customer_id, country]
when wrong column type:
customer_id: string
country: string
# Check fails if there are any duplicate customer IDs
- duplicate_count(customer_id) = 0:
name: All customers are unique
# Check fails if there are any missing customer IDs
- missing_count(customer_id) = 0:
name: All customers have a key
In include/soda/checks/transform/dim_datetime.yml:
checks for dim_datetime:
# Check fails if any required column is missing or has a wrong type
- schema:
fail:
when required column missing: [datetime_id, datetime]
when wrong column type:
datetime_id: string
datetime: datetime
# Check fails if any weekday values are out of range (0–6)
- invalid_count(weekday) = 0:
name: All weekdays are in range 0–6
valid min: 0
valid max: 6
# Check fails if there are any duplicate datetime IDs
- duplicate_count(datetime_id) = 0:
name: All datetimes are unique
# Check fails if there are any missing datetime IDs
- missing_count(datetime_id) = 0:
name: All datetimes have a key
In include/soda/checks/transform/dim_product.yml:
checks for dim_product:
# Check fails if any required column is missing or has a wrong type
- schema:
fail:
when required column missing: [product_id, description, price]
when wrong column type:
product_id: string
description: string
price: float64
# Check fails if there are any duplicate product IDs
- duplicate_count(product_id) = 0:
name: All products are unique
# Check fails if there are any missing product IDs
- missing_count(product_id) = 0:
name: All products have a key
# Check fails if there are any negative prices
- min(price):
fail: when < 0
In include/soda/checks/transform/fct_invoices.yml:
checks for fct_invoices:
# Check fails if any required column is missing or has a wrong type
- schema:
fail:
when required column missing:
[invoice_id, product_id, customer_id, datetime_id, quantity, total]
when wrong column type:
invoice_id: string
product_id: string
customer_id: string
datetime_id: string
quantity: int
total: float64
# Check fails if there are any missing invoice IDs
- missing_count(invoice_id) = 0:
name: All invoices have a key
# Check fails if there are any negative totals for invoices
- failed rows:
name: All invoices have a positive total amount
fail query: |
SELECT invoice_id, total
FROM fct_invoices
WHERE total < 0
Add a new task
@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_transform(scan_name='check_transform', checks_subpath='transform'):
from include.soda.check_function import check
return check(scan_name, checks_subpath)
Reports
In include/dbt/models/report/report_customer_invoices.sql:
— report_customer_invoices.sql — Generate report showing total invoices and revenue by customer country
SELECT
c.country,
c.iso,
COUNT(fi.invoice_id) AS total_invoices,
SUM(fi.total) AS total_revenue
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_customer') }} c ON fi.customer_id = c.customer_id
GROUP BY c.country, c.iso
ORDER BY total_revenue DESC
LIMIT 10;
In include/dbt/models/report/report_product_invoices.sql:
— report_product_invoices.sql — Generate report showing total quantity sold by product
SELECT
p.product_id,
p.stock_code,
p.description,
SUM(fi.quantity) AS total_quantity_sold
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_product') }} p ON fi.product_id = p.product_id
GROUP BY p.product_id, p.stock_code, p.description
ORDER BY total_quantity_sold DESC
LIMIT 10;
These queries generate reports on customer invoices and product invoices, respectively.
— report_year_invoices.sql — Generate report showing total invoices and revenue by year and month
SELECT
dt.year,
dt.month,
COUNT(DISTINCT fi.invoice_id) AS num_invoices,
SUM(fi.total) AS total_revenue
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_datetime') }} dt ON fi.datetime_id = dt.datetime_id
GROUP BY dt.year, dt.month
ORDER BY dt.year, dt.month;
Add a new task:
report = DbtTaskGroup(
group_id='report',
project_config=DBT_PROJECT_CONFIG,
profile_config=DBT_CONFIG,
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
select=['path:models/report']
)
)
If you want to try:
In include/soda/checks/report
# report_customer_invoices.yml
checks for report_customer_invoices:
# Check fails if there are any missing values in the
In include/soda/checks/report
# report_customer_invoices.yml
checks for report_customer_invoices:
# Check fails if there are any missing values in the country column
- missing_count(country) = 0:
name: All customers have a country
# Check fails if the total number of invoices is less than or equal to 0
- min(total_invoices):
fail: when <= 0
# report_product_invoices.yml
checks for report_product_invoices:
# Check fails if there are any missing values in the stock_code column
- missing_count(stock_code) = 0:
name: All products have a stock code
# Check fails if the total quantity sold is less than or equal to 0
- min(total_quantity_sold):
fail: when <= 0
These checks ensure the integrity of the reports generated by the SQL queries.
Add the final task
@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_report(scan_name='check_report', checks_subpath='report'):
from include.soda.check_function import check
return check(scan_name, checks_subpath)
Congratulations! You have completed the data pipeline.
Dashboard
Modify docker-compose.override.yml:
version: '3.7'
services:
webserver:
environment:
- AIRFLOW__WEBSERVER__RBAC=false
ports:
- "8080:8080"
This modification disables RBAC (Role-Based Access Control) in Airflow’s webserver and maps port 8080 to allow access to the Airflow UI.
Stay tuned for more doses of data engineering!