Apache Airflow Building End To End ETL Project
Photo by Djet Leoo on Unsplash

Apache Airflow Building End To End ETL Project

In that article I will cover the essential that you need to know about Airflow, if you don’t know what it is, I wrote an article about it that you can find in the Resources section

I will include concepts related to the tool and also have a practical end to end project so you can understand how it’s work

The project was inspired by Krish Naik, you can watch his videos Here. The project consists of extracting data from an API, transforming it, and then loading it into a database

ETL. Image Source : Dr. Walid Soula

Like always, if you find my articles interesting, don’t forget to like and follow ????, these articles take times and effort to do!

Let’s start the project

Creating the folder and starting your IDE

1 — Create a folder and name it however you like

2 — Open the Command Prompt from that folder:

  • Open the folder you created
  • Click on the address bar at the top of the window
  • Type “cmd” and press Enter to open the Command Prompt directly from that folder

3 — Start VSCode from the Command Prompt by typing

code .         

Astro Installation

Open your terminal to install Astro CLI, but before that, on Windows, you’ll also need:

Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Hyper-V -All        

Once everything is set up, in the VSCode terminal, type the following command to install Astro CLI ????

winget install -e --id Astronomer.Astro        
Downloading Astro. Image Source : Dr. Walid Soula

When the installation is complete, restart VSCode. To start an Astro project just write

astro dev init        
Initialization of Astro. Image Source : Dr. Walid Soula

Astro will automatically create and organize all the necessary project folders for you

Project Folders. Image Source : Dr. Walid Soula

Create our Workflows

Now, we need to create our Workflows using DAGs (Directed Acyclic Graphs). If you’re not familiar with DAGs, refer to my previous article, which covers this concept in detail (you can find it in the Resources section)

DAGs. Image Source : airflow.apache/docs/apache-airflow/stable/core-concepts/dags

In a nutshell, DAGs (Directed Acyclic Graphs) are graphical representations of tasks that illustrate how they interact with each other through dependencies.

Each node in a DAG represents a task, while the edges represent dependencies between these tasks, ensuring an organized flow

Create a DAG:

  1. Navigate to your dags folder within your Astro project
  2. Create a new .py file to define your DAG structure

Inside this file, load the necessary libraries, adding comments to explain each library’s role:

from airflow import DAG
from airflow.providers.http.hooks.http import HttpHook # We will be reading data from an API
from airflow.providers.postgres.hooks.postgres import PostgresHook # Push my data to SQL
from airflow.decorators import task # To create tasks inside the DAG
from airflow.utils.dates import days_ago # Sets the start date to a specific number of days before today
import requests # Make HTTP requests
import json # Parsing JSON responses from APIs        

I need to specify the latitude and longitude for the API to retrieve the data. For this purpose, I’ll use the coordinates for Algeria

LATITUDE = '28.0339'
LONGITUDE = '1.6596'        

We also need to specify the connection ID for both the API and the database we are using. I will show you how to do it later inside Airflow. For now, please write the following :

POSTGRES_CONN_ID='postgres_default' # This is the connection ID used in Apache Airflow to reference our PostgreSQL database
API_CONN_ID='open_meteo_api' # This is the connection ID used in Apache Airflow to reference our Open Meteo API        

Define Airflow DAG default parameters

default_args={
    'owner':'airflow', #Owner of the DAG
    'start_date':days_ago(1) # This sets the starting date for the scheduling of the DAG
}        

Create an instance of the DAG

with DAG(dag_id='weather_etl_pipeline', # Identifier for the DAG
         default_args=default_args, #Default settings for tasks
         schedule_interval='@daily', #Frequency of execution
         catchup=False) as dags: #Skip past missed runs        

Note regarding catchup=False : means that if your DAG is scheduled to run daily, it won’t execute any past runs that were missed before the DAG was enabled

The tasks inside the DAG starting with the extraction

  • I will use the http_hook to get connection details from Airflow connection that we will configure later
  • Build an end point for the API
  • Make request and verify the connection

    @task()
    def extract_weather_data():
        http_hook=HttpHook(http_conn_id=API_CONN_ID,method='GET')
        endpoint=f'/v1/forecast?latitude={LATITUDE}&longitude={LONGITUDE}&current_weather=true'
        response=http_hook.run(endpoint)

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to fetch weather data: {response.status_code}")        

Transformation task

It’s not really a transformation of the data but pulls out only the relevant fields (like temperature, windspeed…) from the “current_weather”

    @task()
    def transform_weather_data(weather_data):
        current_weather = weather_data['current_weather']
        transformed_data = {
            'latitude': LATITUDE,
            'longitude': LONGITUDE,
            'temperature': current_weather['temperature'],
            'windspeed': current_weather['windspeed'],
            'winddirection': current_weather['winddirection'],
            'weathercode': current_weather['weathercode']
        }
        return transformed_data        

Load the data

I will then load the transformed data into a PostgreSQL database

  • First, It establishes a connection to the database
  • The function creates the necessary table if it doesn’t already exist, and inserts the extracted weather data into this table.

 @task()
    def load_weather_data(transformed_data):
        """Load transformed data into PostgreSQL."""
        pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
        conn = pg_hook.get_conn()
        cursor = conn.cursor()

        # Create
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS weather_data (
            latitude FLOAT,
            longitude FLOAT,
            temperature FLOAT,
            windspeed FLOAT,
            winddirection FLOAT,
            weathercode INT,
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """)

        # Insert 
        cursor.execute("""
        INSERT INTO weather_data (latitude, longitude, temperature, windspeed, winddirection, weathercode)
        VALUES (%s, %s, %s, %s, %s, %s)
        """, (
            transformed_data['latitude'],
            transformed_data['longitude'],
            transformed_data['temperature'],
            transformed_data['windspeed'],
            transformed_data['winddirection'],
            transformed_data['weathercode']
        ))

        conn.commit()
        cursor.close()        

Our Pipeline

    ## DAG Worflow- ETL Pipeline
    weather_data= extract_weather_data()
    transformed_data=transform_weather_data(weather_data)
    load_weather_data(transformed_data        

Full Code

from airflow import DAG
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from airflow.utils.dates import days_ago
import requests
import json

# Latitude and longitude for the desired location (London in this case)
LATITUDE = '28.0339'
LONGITUDE = '1.6596'
POSTGRES_CONN_ID='postgres_default'
API_CONN_ID='open_meteo_api'

default_args={
    'owner':'airflow',
    'start_date':days_ago(1)
}

## DAG
with DAG(dag_id='weather_etl_pipeline',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dags:
    
    @task()
    def extract_weather_data():
        """Extract weather data from Open-Meteo API using Airflow Connection."""

        http_hook=HttpHook(http_conn_id=API_CONN_ID,method='GET')

        endpoint=f'/v1/forecast?latitude={LATITUDE}&longitude={LONGITUDE}&current_weather=true'

        response=http_hook.run(endpoint)

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to fetch weather data: {response.status_code}")
        
    @task()
    def transform_weather_data(weather_data):
        """Transform the extracted weather data."""
        current_weather = weather_data['current_weather']
        transformed_data = {
            'latitude': LATITUDE,
            'longitude': LONGITUDE,
            'temperature': current_weather['temperature'],
            'windspeed': current_weather['windspeed'],
            'winddirection': current_weather['winddirection'],
            'weathercode': current_weather['weathercode']
        }
        return transformed_data
    
    @task()
    def load_weather_data(transformed_data):
        """Load transformed data into PostgreSQL."""
        pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
        conn = pg_hook.get_conn()
        cursor = conn.cursor()

        cursor.execute("""
        CREATE TABLE IF NOT EXISTS weather_data (
            latitude FLOAT,
            longitude FLOAT,
            temperature FLOAT,
            windspeed FLOAT,
            winddirection FLOAT,
            weathercode INT,
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """)

        cursor.execute("""
        INSERT INTO weather_data (latitude, longitude, temperature, windspeed, winddirection, weathercode)
        VALUES (%s, %s, %s, %s, %s, %s)
        """, (
            transformed_data['latitude'],
            transformed_data['longitude'],
            transformed_data['temperature'],
            transformed_data['windspeed'],
            transformed_data['winddirection'],
            transformed_data['weathercode']
        ))

        conn.commit()
        cursor.close()

    weather_data= extract_weather_data()
    transformed_data=transform_weather_data(weather_data)
    load_weather_data(transformed_data)        

Create a Docker compose file

It’s like an exportable environment for the Airflow project where I have my configs, dependencies and environment in general, this is the configuration (Create a file inside your project folder and name it docker-compose.yml)

version: '3' # The version of the Docker Compose file format
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  webserver:
    image: apache/airflow:2.6.1
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@postgres:5432/airflow
      AIRFLOW__CORE__FERNET_KEY: 'YOUR_FERNET_KEY'
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs  
    depends_on:
      - postgres
    command: webserver

  scheduler:
    image: apache/airflow:2.6.1
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@postgres:5432/airflow
      AIRFLOW__CORE__FERNET_KEY: 'YOUR_FERNET_KEY'
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
    command: scheduler

volumes:
  postgres_data:
  airflow_logs:        

Note (If needed) : Replace ‘YOUR_FERNET_KEY’ with a secure Fernet key to enable encryption of sensitive data in your Airflow environment. You can generate a Fernet key using the following Python code:

from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key.decode())  # Use this key in your Docker Compose file        

Starting your Airflow

You can start it using the following command. Airflow will launch, and you’ll see a new container appear in your Docker setup.

astro dev start        


Note : By default the Airflow Username and Password are Admin/Admin

As soon as you open your Airflow you will find your Dags


You can also explore your Pipeline by clicking on the DAG


Setup Connection

Now that we’re in, let’s set up the necessary connections to run the DAG. Navigate to Admin and then select Connections. From there, you can add your connections for both:

  • postgres_default
  • open_meteo_api


Note regarding the configuration (Images below) :

  • Host is the name of the container in your Docker
  • The name, ID, port and Password of the database are in the Docker-compose.yml file


Now let’s get back to the DAG on Airflow and run it from there


If you want to explore what you extracted, select the task related to that then click on XCom


Connect to you database

You can see your database running from your Docker, but if you want to access to that database you can use DBeaver


If you want to use DBeaver, click on Connect to Database, select your database, enter the connection settings, test the connection, and then access your database.


If there’s a specific subject you’d like us to cover, please don’t hesitate to let me know! Your input will help shape the direction of my content and ensure it remains relevant and engaging??

Resources


If you found this helpful, consider Sharing ?? and follow me Dr. Oualid Soula for more content like this.

Join the journey of discovery and stay ahead in the world of data science and AI! Don't miss out on the latest insights and updates - subscribe to the newsletter for free ????https://lnkd.in/eNBG5dWm , and become part of our growing community!

要查看或添加评论,请登录

Dr. Oualid S.的更多文章

  • Herfindahl-Hirschman Index (HHI)

    Herfindahl-Hirschman Index (HHI)

    In this article, I will discuss a key metric in market research known as the Herfindahl-Hirschman Index (HHI), which is…

  • Evaluating a company’s portfolio with the MABA Analysis

    Evaluating a company’s portfolio with the MABA Analysis

    In this article, we will cover another tool that can be used in international marketing called MABA Analysis. This tool…

  • 7S McKinsey Model for Internal Analysis

    7S McKinsey Model for Internal Analysis

    It's been quite a while since I wrote an article on business strategies, so I thought I'd kick off this week by…

    2 条评论
  • Step by Step guide A/B for UX (Binary Data)

    Step by Step guide A/B for UX (Binary Data)

    In the last article I covered how to execute a hypothesis test illustrated by a UX research design where we compared…

  • Retail Analytics project

    Retail Analytics project

    This article is an introduction to the world of machine learning, for anyone wanting to participate in small-scale…

  • From Sci-Fi to Reality | Exploring the root of AI

    From Sci-Fi to Reality | Exploring the root of AI

    For people who have not jumped into AI or are just hooked on generative AI and want to understand how things work?…

  • Diving Deep into Significance Analysis

    Diving Deep into Significance Analysis

    In the constantly changing landscape of scientific research, the pursuit of significance extends well beyond the usual…

  • Volcano Plots

    Volcano Plots

    In this article, I will cover a well-known plot used mainly in genomics called the volcano plot. It is used to…

  • Simpson’s Paradox

    Simpson’s Paradox

    In this article, I will cover a well-known statistical phenomenon that you may have heard of before called ‘Simpson’s…

  • AI Agent Starter using Llama 3.3

    AI Agent Starter using Llama 3.3

    If you are not familiar with AI Agents yet and feel like you are falling behind, this article is for you! It’s an…

    3 条评论

社区洞察

其他会员也浏览了