Building an End-to-End Airflow Data Pipeline with BigQuery, dbt & Soda

Building an End-to-End Airflow Data Pipeline with BigQuery, dbt & Soda

Dataset

The dataset for this retail project can be found at this Kaggle link.

Column Description

  • InvoiceNo: This is the invoice number, a 6-digit integral number assigned to each transaction. If the code begins with the letter ‘c’, it denotes a cancellation.
  • StockCode: A unique 5-digit integral number assigned to each distinct product, representing the product/item code.
  • Description: This column contains the name of the product/item.
  • Quantity: Indicates the quantities of each product/item per transaction, represented numerically.
  • InvoiceDate: Represents the date and time of each transaction.
  • UnitPrice: This column represents the unit price of the product, denoted in sterling.
  • CustomerID: Each customer is assigned a unique 5-digit integral number.
  • Country: The name of the country where each customer resides is mentioned in this column.

Pipeline

Prerequisites

  • Docker
  • Astro CLI
  • Soda
  • Google Cloud (GC) account

Steps

IMPORTANT: Please ensure that the Dockerfile is configured to use quay.io/astronomer/astro-runtime:8.8.0 in the Dockerfile (or use Airflow 2.6.1). If not, please update to that version and restart Airflow (execute astro dev restart with the Astro CLI)

  • Download the dataset from this link.
  • Save the CSV file in the directory include/dataset/online_retail.csv.
  • Update requirements.txt by adding apache-airflow-providers-google==10.3.0, then restart Airflow.
  • Create a Google Cloud Storage (GCS) bucket with a unique name <your_name>_online_retail.
  • Create a service account named airflow-online-retail.
  • Grant admin access to GCS and BigQuery.
  • Navigate to the service account → Keys → Add Key → Copy the JSON content.
  • Create a new file named service_account.json in the directory include/gcp/.

{
    "type": "service_account",
    "project_id": "your_project_id_here",
    "private_key_id": "58cfee8a937e7bfc66ae6465a848db53cf4fb919",
    "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADA... (truncated for brevity) ...Ehq3KtZ\n-----END PRIVATE KEY-----\n",
    "client_email": "airflow-online-retail@your_project_id_here.iam.gserviceaccount.com",
    "client_id": "117189738213106346790",
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
    "token_uri": "https://oauth2.googleapis.com/token",
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
    "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/airflow-online-retail%40your_project_id_here.iam.gserviceaccount.com",
    "universe_domain": "googleapis.com"
}        

Navigate to Airflow → Admin → Connections

  • id: gcp
  • type: Google Cloud
  • Key Path: /usr/local/airflow/include/gcp/service_account.json
  • Test the connection and then Save (must be enabled from 2.7)

Proceed to create the DAG.

from airflow.decorators import dag, task
from datetime import datetime

from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator

@dag(
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['retail'],
)
def retail_dag():

    upload_csv_to_gcs_task = LocalFilesystemToGCSOperator(
        task_id='upload_csv_to_gcs',
        src='your_local_file_path_here',
        dst='your_destination_path_in_GCS_here',
        bucket='your_bucket_name_here',
        gcp_conn_id='gcp',
        mime_type='text/csv',
    )

retail_dag()        

Make sure to replace retail_dag with the name of your DAG if it’s different. This will execute the upload_csv_to_gcs task of your DAG for the specified date (2023–01–01 in this case).

To test the task, you can run the following commands

astro dev bash
airflow tasks test retail_dag upload_csv_to_gcs 2023-01-01        

Create an empty Dataset

from airflow.decorators import dag, task
from datetime import datetime

from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator

@dag(
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['retail'],
)
def retail_dag():

    create_retail_dataset_task = BigQueryCreateEmptyDatasetOperator(
        task_id='create_retail_dataset',
        dataset_id='retail',
        gcp_conn_id='gcp',
    )

retail_dag()        

Create an airflow task to load the provided file into a BigQuery table named raw_invoices.

from airflow.decorators import dag, task
from datetime import datetime

from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata
from astro.constants import FileType

@dag(
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['retail'],
)
def retail_dag():

    gcs_to_raw_task = aql.load_file(
        task_id='gcs_to_raw',
        input_file=File(
            'gs://marclamberti_online_retail/raw/online_retail.csv',
            conn_id='gcp',
            filetype=FileType.CSV,
        ),
        output_table=Table(
            name='raw_invoices',
            conn_id='gcp',
            metadata=Metadata(schema='retail')
        ),
        use_native_support=False,
    )

retail_dag()        

Data has been successfully loaded into the warehouse!

  • Set up Soda Core

Add to requirements.txt

soda-core-bigquery==3.0.45        

Generate a configuration.yml

# include/soda/configuration.yml

data_source retail:
  type: bigquery
  connection:
    account_info_json_path: /usr/local/airflow/include/gcp/service_account.json
    auth_scopes:
      - https://www.googleapis.com/auth/bigquery
      - https://www.googleapis.com/auth/cloud-platform
      - https://www.googleapis.com/auth/drive
    project_id: 'your_project_id_here'
    dataset: retail

soda_cloud:
  host: cloud.soda.io
  api_key_id: 'your_api_key_id_here'
  api_key_secret: 'your_api_key_secret_here'        

Locate the project_id in GCP

Sign up for a Soda account

Create an API → Profile → API Key?s → Generate API Key → Copy the API key

Test the connection

astro dev bash
soda test-connection -d retail -c include/soda/configuration.yml        

Create the first test

# include/soda/checks/sources/raw_invoices.yml

checks for raw_invoices:
  - schema:
      fail:
        when required column missing: [Invoice_No, Stock_Code, Quantity, Invoice_Date, UnitPrice, Customer_ID, Country]
        when wrong column type:
          InvoiceNo: string
          StockCode: string
          Quantity: integer
          InvoiceDate: string
          UnitPrice: float64
          CustomerID: float64
          Country: string        

Perform the quality check:

soda scan -d retail -c include/soda/configuration.yml
include/soda/checks/sources/raw_invoices.yml        

Create the check function

def check(scan_name, checks_subpath=None, data_source_name='retail', project_root='include'):
    from soda.scan import Scan

    print('Running Soda Scan ...')
    config_file_path = f'{project_root}/soda/configuration.yml'
    checks_path = f'{project_root}/soda/checks'

    if checks_subpath:
        checks_path += f'/{checks_subpath}'

    scan_instance = Scan()
    scan_instance.set_verbose()
    scan_instance.add_configuration_yaml_file(config_file_path)
    scan_instance.set_data_source_name(data_source_name)
    scan_instance.add_sodacl_yaml_files(checks_path)
    scan_instance.set_scan_definition_name(scan_name)

    result = scan_instance.execute()
    print(scan_instance.get_logs_text())

    if result != 0:
        raise ValueError('Soda Scan failed')

    return result        

Create the Python virtual environment and install Soda:

RUN python -m venv soda_venv && \
    . soda_venv/bin/activate && \
    pip install --no-cache-dir soda-core-bigquery==3.0.45 && \
    pip install --no-cache-dir soda-core-scientific==3.0.45 && \
    deactivate        

Within the DAG, add a new task:

@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_load(scan_name='check_load', checks_subpath='sources'):
    from include.soda.check_function import check

    return check(scan_name, checks_subpath)        

ExternalPython utilizes an existing Python virtual environment with pre-installed dependencies, making it faster to run compared to VirtualPython, where dependencies are installed with each run.

Verify the task

astro dev bash
airflow tasks test retail check_load 2023-01-01        

Transform

Install Cosmos — DBT

In requirements.txt:

astronomer-cosmos[dbt-bigquery]==1.0.3 // Installs Google Cloud Platform, Cosmos, and DBT
protobuf==3.20.0        

In the environment configuration (env):

PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python        

In the Dockerfile:

RUN python -m venv dbt_venv && \
    . dbt_venv/bin/activate && \
    pip install --no-cache-dir dbt-bigquery==1.5.3 && \
    deactivate        

Restart the development environment:

astro dev restart        

Establish include/dbt directory structure:

packages:
 - package: dbt-labs/dbt_utils
 version: 1.1.1
# dbt_project.yml
name: 'retail'
profile: 'retail'
models:
 retail:
 materialized: table        

# profiles.yml

retail:
 target: dev
 outputs:
 dev:
 type: bigquery
 method: service-account
 keyfile: /usr/local/airflow/include/gcp/service_account.json
 project: airtube-390719
 dataset: retail
 threads: 1
 timeout_seconds: 300
 location: US        

Navigate to BigQuery

Execute the following SQL query

CREATE TABLE IF NOT EXISTS `retail.country` (
 `id` INT NOT NULL,
 `iso` STRING NOT NULL,
 `name` STRING NOT NULL,
 `nicename` STRING NOT NULL,
 `iso3` STRING DEFAULT NULL,
 `numcode` INT DEFAULT NULL,
 `phonecode` INT NOT NULL
);        

— Inserting data into retail.country table

INSERT INTO `retail.country` (`id`, `name`, `nicename`, `iso3`, `numcode`, `phonecode`) VALUES
(1, 'AFGHANISTAN', 'Afghanistan', 'AFG', 4, 93),
(2, 'ALBANIA', 'Albania', 'ALB', 8, 355);
(238, 'ZAMBIA', 'Zambia', 'ZMB', 894, 260),
(239, 'ZIMBABWE', 'Zimbabwe', 'ZWE', 716, 263);        

In models/sources/sources.yml:

version: 2
sources:
 - name: retail
 database: your_project_id_here # Replace with the actual project id!
 tables:
 - name: raw_invoices
 - name: country        

In models/transform/dim_customer.sql:

- dim_customer.sql
 - Create the dimension table        
WITH customer_cte AS (
 SELECT DISTINCT
 {{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
 Country AS country
 FROM {{ source('retail', 'raw_invoices') }}
 WHERE CustomerID IS NOT NULL
)
SELECT
 t.*,
 cm.iso
FROM customer_cte t
LEFT JOIN {{ source('retail', 'country') }} cm ON t.country = cm.nicename;        

Replace the placeholder your_project_id_here with the actual project ID.

- dim_datetime.sql
- Create a Common Table Expression (CTE) to Extract Date and Time Components.        
WITH datetime_cte AS ( 
 SELECT DISTINCT
 InvoiceDate AS datetime_id,
 CASE
 WHEN LENGTH(InvoiceDate) = 16 THEN
 - Date format: "DD/MM/YYYY HH:MM"
 PARSE_DATETIME('%m/%d/%Y %H:%M', InvoiceDate)
 WHEN LENGTH(InvoiceDate) <= 14 THEN
 - Date format: "MM/DD/YY HH:MM"
 PARSE_DATETIME('%m/%d/%y %H:%M', InvoiceDate)
 ELSE
 NULL
 END AS date_part
 FROM {{ source('retail', 'raw_invoices') }}
 WHERE InvoiceDate IS NOT NULL
)
SELECT
 datetime_id,
 date_part as datetime,
 EXTRACT(YEAR FROM date_part) AS year,
 EXTRACT(MONTH FROM date_part) AS month,
 EXTRACT(DAY FROM date_part) AS day,
 EXTRACT(HOUR FROM date_part) AS hour,
 EXTRACT(MINUTE FROM date_part) AS minute,
 EXTRACT(DAYOFWEEK FROM date_part) AS weekday
FROM datetime_cte;        
- dim_product.sql
 - StockCode isn't unique, a product with the same id can have different prices
 - Create the dimension table        
SELECT DISTINCT
 {{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
 StockCode AS stock_code,
 Description AS description,
 UnitPrice AS price
FROM {{ source('retail', 'raw_invoices') }}
WHERE StockCode IS NOT NULL
AND UnitPrice > 0;        
- fct_invoices.sql
 - Create the fact table by joining the relevant keys from the dimension table        
WITH fct_invoices_cte AS (
 SELECT
 InvoiceNo AS invoice_id,
 InvoiceDate AS datetime_id,
 {{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
 {{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
 Quantity AS quantity,
 Quantity * UnitPrice AS total
 FROM {{ source('retail', 'raw_invoices') }}
 WHERE Quantity > 0
)
SELECT
 invoice_id,
 dt.datetime_id,
 dp.product_id,
 dc.customer_id,
 quantity,
 total
FROM fct_invoices_cte fi
INNER JOIN {{ ref('dim_datetime') }} dt ON fi.datetime_id = dt.datetime_id
INNER JOIN {{ ref('dim_product') }} dp ON fi.product_id = dp.product_id
INNER JOIN {{ ref('dim_customer') }} dc ON fi.customer_id = dc.customer_id;        

Execute the models

astro dev bash
source /usr/local/airflow/dbt_venv/bin/activate
cd include/dbt 
dbt deps
dbt run - profiles-dir /usr/local/airflow/include/dbt/        

Verify on BigQuery that the tables exist with data.

Incorporate the task into the workflow

from include.dbt.cosmos_config import DBT_PROJECT_CONFIG, DBT_CONFIG
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import LoadMode
from cosmos.config import ProjectConfig, RenderConfig
transform = DbtTaskGroup(
 group_id='transform',
 project_config=DBT_PROJECT_CONFIG,
 profile_config=DBT_CONFIG,
 render_config=RenderConfig(
 load_method=LoadMode.DBT_LS,
 select=['path:models/transform']
 )
)        

Create DBT_CONFIG

# include/dbt/cosmos_config.py
 
 from cosmos.config import ProfileConfig, ProjectConfig
 from pathlib import Path
 
 DBT_CONFIG = ProfileConfig(
 profile_name='retail',
 target_name='dev',
 profiles_yml_filepath=Path('/usr/local/airflow/include/dbt/profiles.yml')
 )
 
 DBT_PROJECT_CONFIG = ProjectConfig(
 dbt_project_path='/usr/local/airflow/include/dbt/',
 )        

Test a task

astro dev bash
 airflow tasks list retail
 airflow tasks test retail transform.dim_customer.dim_customer_run 2023–01–01
 airflow tasks test retail transform.dim_customer.dim_customer_test 2023–01–01        

Go check out the Airflow UI, you should see a TaskGroup transform with the models.First dbt models in place!

In include/soda/checks/transform/dim_customer.yml:

checks for dim_customer:
 # Check fails if any required column is missing or has a wrong type
 - schema:
 fail:
 when required column missing: 
 [customer_id, country]
 when wrong column type:
 customer_id: string
 country: string
 # Check fails if there are any duplicate customer IDs
 - duplicate_count(customer_id) = 0:
 name: All customers are unique
 # Check fails if there are any missing customer IDs
 - missing_count(customer_id) = 0:
 name: All customers have a key        

In include/soda/checks/transform/dim_datetime.yml:

checks for dim_datetime:
 # Check fails if any required column is missing or has a wrong type
 - schema:
 fail:
 when required column missing: [datetime_id, datetime]
 when wrong column type:
 datetime_id: string
 datetime: datetime
 # Check fails if any weekday values are out of range (0–6)
 - invalid_count(weekday) = 0:
 name: All weekdays are in range 0–6
 valid min: 0
 valid max: 6
 # Check fails if there are any duplicate datetime IDs
 - duplicate_count(datetime_id) = 0:
 name: All datetimes are unique
 # Check fails if there are any missing datetime IDs
 - missing_count(datetime_id) = 0:
 name: All datetimes have a key        

In include/soda/checks/transform/dim_product.yml:

checks for dim_product:
 # Check fails if any required column is missing or has a wrong type
 - schema:
 fail:
 when required column missing: [product_id, description, price]
 when wrong column type:
 product_id: string
 description: string
 price: float64
 # Check fails if there are any duplicate product IDs
 - duplicate_count(product_id) = 0:
 name: All products are unique
 # Check fails if there are any missing product IDs
 - missing_count(product_id) = 0:
 name: All products have a key
 # Check fails if there are any negative prices
 - min(price):
 fail: when < 0        

In include/soda/checks/transform/fct_invoices.yml:

checks for fct_invoices:
 # Check fails if any required column is missing or has a wrong type
 - schema:
 fail:
 when required column missing: 
 [invoice_id, product_id, customer_id, datetime_id, quantity, total]
 when wrong column type:
 invoice_id: string
 product_id: string
 customer_id: string
 datetime_id: string
 quantity: int
 total: float64
 # Check fails if there are any missing invoice IDs
 - missing_count(invoice_id) = 0:
 name: All invoices have a key
 # Check fails if there are any negative totals for invoices
 - failed rows:
 name: All invoices have a positive total amount
 fail query: |
 SELECT invoice_id, total
 FROM fct_invoices
 WHERE total < 0        

Add a new task

@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_transform(scan_name='check_transform', checks_subpath='transform'):
 from include.soda.check_function import check
 return check(scan_name, checks_subpath)        

Reports

In include/dbt/models/report/report_customer_invoices.sql:

— report_customer_invoices.sql — Generate report showing total invoices and revenue by customer country

SELECT
 c.country,
 c.iso,
 COUNT(fi.invoice_id) AS total_invoices,
 SUM(fi.total) AS total_revenue
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_customer') }} c ON fi.customer_id = c.customer_id
GROUP BY c.country, c.iso
ORDER BY total_revenue DESC
LIMIT 10;        

In include/dbt/models/report/report_product_invoices.sql:

— report_product_invoices.sql — Generate report showing total quantity sold by product

SELECT
 p.product_id,
 p.stock_code,
 p.description,
 SUM(fi.quantity) AS total_quantity_sold
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_product') }} p ON fi.product_id = p.product_id
GROUP BY p.product_id, p.stock_code, p.description
ORDER BY total_quantity_sold DESC
LIMIT 10;        

These queries generate reports on customer invoices and product invoices, respectively.

— report_year_invoices.sql — Generate report showing total invoices and revenue by year and month

SELECT
 dt.year,
 dt.month,
 COUNT(DISTINCT fi.invoice_id) AS num_invoices,
 SUM(fi.total) AS total_revenue
FROM {{ ref('fct_invoices') }} fi
JOIN {{ ref('dim_datetime') }} dt ON fi.datetime_id = dt.datetime_id
GROUP BY dt.year, dt.month
ORDER BY dt.year, dt.month;        

Add a new task:

report = DbtTaskGroup(
 group_id='report',
 project_config=DBT_PROJECT_CONFIG,
 profile_config=DBT_CONFIG,
 render_config=RenderConfig(
 load_method=LoadMode.DBT_LS,
 select=['path:models/report']
 )
)        

If you want to try:

In include/soda/checks/report
# report_customer_invoices.yml
checks for report_customer_invoices:
 # Check fails if there are any missing values in the        

In include/soda/checks/report

# report_customer_invoices.yml
checks for report_customer_invoices:
 # Check fails if there are any missing values in the country column
 - missing_count(country) = 0:
 name: All customers have a country
 # Check fails if the total number of invoices is less than or equal to 0
 - min(total_invoices):
 fail: when <= 0
# report_product_invoices.yml
checks for report_product_invoices:
 # Check fails if there are any missing values in the stock_code column
 - missing_count(stock_code) = 0:
 name: All products have a stock code
 # Check fails if the total quantity sold is less than or equal to 0
 - min(total_quantity_sold):
 fail: when <= 0        

These checks ensure the integrity of the reports generated by the SQL queries.

Add the final task

@task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
def check_report(scan_name='check_report', checks_subpath='report'):
 from include.soda.check_function import check
 return check(scan_name, checks_subpath)        

Congratulations! You have completed the data pipeline.

Dashboard

Modify docker-compose.override.yml:

version: '3.7'
services:
 webserver:
 environment:
 - AIRFLOW__WEBSERVER__RBAC=false
 ports:
 - "8080:8080"        

This modification disables RBAC (Role-Based Access Control) in Airflow’s webserver and maps port 8080 to allow access to the Airflow UI.

Stay tuned for more doses of data engineering!

要查看或添加评论,请登录

社区洞察

其他会员也浏览了