Apache Airflow Building End To End ETL Project
In that article I will cover the essential that you need to know about Airflow, if you don’t know what it is, I wrote an article about it that you can find in the Resources section
I will include concepts related to the tool and also have a practical end to end project so you can understand how it’s work
The project was inspired by Krish Naik, you can watch his videos Here. The project consists of extracting data from an API, transforming it, and then loading it into a database
Like always, if you find my articles interesting, don’t forget to like and follow ????, these articles take times and effort to do!
Let’s start the project
Creating the folder and starting your IDE
1 — Create a folder and name it however you like
2 — Open the Command Prompt from that folder:
3 — Start VSCode from the Command Prompt by typing
code .
Astro Installation
Open your terminal to install Astro CLI, but before that, on Windows, you’ll also need:
Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Hyper-V -All
Once everything is set up, in the VSCode terminal, type the following command to install Astro CLI ????
winget install -e --id Astronomer.Astro
When the installation is complete, restart VSCode. To start an Astro project just write
astro dev init
Astro will automatically create and organize all the necessary project folders for you
Create our Workflows
Now, we need to create our Workflows using DAGs (Directed Acyclic Graphs). If you’re not familiar with DAGs, refer to my previous article, which covers this concept in detail (you can find it in the Resources section)
In a nutshell, DAGs (Directed Acyclic Graphs) are graphical representations of tasks that illustrate how they interact with each other through dependencies.
Each node in a DAG represents a task, while the edges represent dependencies between these tasks, ensuring an organized flow
Create a DAG:
Inside this file, load the necessary libraries, adding comments to explain each library’s role:
from airflow import DAG
from airflow.providers.http.hooks.http import HttpHook # We will be reading data from an API
from airflow.providers.postgres.hooks.postgres import PostgresHook # Push my data to SQL
from airflow.decorators import task # To create tasks inside the DAG
from airflow.utils.dates import days_ago # Sets the start date to a specific number of days before today
import requests # Make HTTP requests
import json # Parsing JSON responses from APIs
I need to specify the latitude and longitude for the API to retrieve the data. For this purpose, I’ll use the coordinates for Algeria
LATITUDE = '28.0339'
LONGITUDE = '1.6596'
We also need to specify the connection ID for both the API and the database we are using. I will show you how to do it later inside Airflow. For now, please write the following :
POSTGRES_CONN_ID='postgres_default' # This is the connection ID used in Apache Airflow to reference our PostgreSQL database
API_CONN_ID='open_meteo_api' # This is the connection ID used in Apache Airflow to reference our Open Meteo API
Define Airflow DAG default parameters
default_args={
'owner':'airflow', #Owner of the DAG
'start_date':days_ago(1) # This sets the starting date for the scheduling of the DAG
}
Create an instance of the DAG
with DAG(dag_id='weather_etl_pipeline', # Identifier for the DAG
default_args=default_args, #Default settings for tasks
schedule_interval='@daily', #Frequency of execution
catchup=False) as dags: #Skip past missed runs
Note regarding catchup=False : means that if your DAG is scheduled to run daily, it won’t execute any past runs that were missed before the DAG was enabled
The tasks inside the DAG starting with the extraction
@task()
def extract_weather_data():
http_hook=HttpHook(http_conn_id=API_CONN_ID,method='GET')
endpoint=f'/v1/forecast?latitude={LATITUDE}&longitude={LONGITUDE}¤t_weather=true'
response=http_hook.run(endpoint)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to fetch weather data: {response.status_code}")
Transformation task
It’s not really a transformation of the data but pulls out only the relevant fields (like temperature, windspeed…) from the “current_weather”
@task()
def transform_weather_data(weather_data):
current_weather = weather_data['current_weather']
transformed_data = {
'latitude': LATITUDE,
'longitude': LONGITUDE,
'temperature': current_weather['temperature'],
'windspeed': current_weather['windspeed'],
'winddirection': current_weather['winddirection'],
'weathercode': current_weather['weathercode']
}
return transformed_data
Load the data
I will then load the transformed data into a PostgreSQL database
@task()
def load_weather_data(transformed_data):
"""Load transformed data into PostgreSQL."""
pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
conn = pg_hook.get_conn()
cursor = conn.cursor()
# Create
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_data (
latitude FLOAT,
longitude FLOAT,
temperature FLOAT,
windspeed FLOAT,
winddirection FLOAT,
weathercode INT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# Insert
cursor.execute("""
INSERT INTO weather_data (latitude, longitude, temperature, windspeed, winddirection, weathercode)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
transformed_data['latitude'],
transformed_data['longitude'],
transformed_data['temperature'],
transformed_data['windspeed'],
transformed_data['winddirection'],
transformed_data['weathercode']
))
conn.commit()
cursor.close()
领英推荐
Our Pipeline
## DAG Worflow- ETL Pipeline
weather_data= extract_weather_data()
transformed_data=transform_weather_data(weather_data)
load_weather_data(transformed_data
Full Code
from airflow import DAG
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from airflow.utils.dates import days_ago
import requests
import json
# Latitude and longitude for the desired location (London in this case)
LATITUDE = '28.0339'
LONGITUDE = '1.6596'
POSTGRES_CONN_ID='postgres_default'
API_CONN_ID='open_meteo_api'
default_args={
'owner':'airflow',
'start_date':days_ago(1)
}
## DAG
with DAG(dag_id='weather_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
catchup=False) as dags:
@task()
def extract_weather_data():
"""Extract weather data from Open-Meteo API using Airflow Connection."""
http_hook=HttpHook(http_conn_id=API_CONN_ID,method='GET')
endpoint=f'/v1/forecast?latitude={LATITUDE}&longitude={LONGITUDE}¤t_weather=true'
response=http_hook.run(endpoint)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to fetch weather data: {response.status_code}")
@task()
def transform_weather_data(weather_data):
"""Transform the extracted weather data."""
current_weather = weather_data['current_weather']
transformed_data = {
'latitude': LATITUDE,
'longitude': LONGITUDE,
'temperature': current_weather['temperature'],
'windspeed': current_weather['windspeed'],
'winddirection': current_weather['winddirection'],
'weathercode': current_weather['weathercode']
}
return transformed_data
@task()
def load_weather_data(transformed_data):
"""Load transformed data into PostgreSQL."""
pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_data (
latitude FLOAT,
longitude FLOAT,
temperature FLOAT,
windspeed FLOAT,
winddirection FLOAT,
weathercode INT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
cursor.execute("""
INSERT INTO weather_data (latitude, longitude, temperature, windspeed, winddirection, weathercode)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
transformed_data['latitude'],
transformed_data['longitude'],
transformed_data['temperature'],
transformed_data['windspeed'],
transformed_data['winddirection'],
transformed_data['weathercode']
))
conn.commit()
cursor.close()
weather_data= extract_weather_data()
transformed_data=transform_weather_data(weather_data)
load_weather_data(transformed_data)
Create a Docker compose file
It’s like an exportable environment for the Airflow project where I have my configs, dependencies and environment in general, this is the configuration (Create a file inside your project folder and name it docker-compose.yml)
version: '3' # The version of the Docker Compose file format
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: airflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.6.1
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY: 'YOUR_FERNET_KEY'
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
command: webserver
scheduler:
image: apache/airflow:2.6.1
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY: 'YOUR_FERNET_KEY'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
command: scheduler
volumes:
postgres_data:
airflow_logs:
Note (If needed) : Replace ‘YOUR_FERNET_KEY’ with a secure Fernet key to enable encryption of sensitive data in your Airflow environment. You can generate a Fernet key using the following Python code:
from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key.decode()) # Use this key in your Docker Compose file
Starting your Airflow
You can start it using the following command. Airflow will launch, and you’ll see a new container appear in your Docker setup.
astro dev start
Note : By default the Airflow Username and Password are Admin/Admin
As soon as you open your Airflow you will find your Dags
You can also explore your Pipeline by clicking on the DAG
Setup Connection
Now that we’re in, let’s set up the necessary connections to run the DAG. Navigate to Admin and then select Connections. From there, you can add your connections for both:
Note regarding the configuration (Images below) :
Now let’s get back to the DAG on Airflow and run it from there
If you want to explore what you extracted, select the task related to that then click on XCom
Connect to you database
You can see your database running from your Docker, but if you want to access to that database you can use DBeaver
If you want to use DBeaver, click on Connect to Database, select your database, enter the connection settings, test the connection, and then access your database.
If there’s a specific subject you’d like us to cover, please don’t hesitate to let me know! Your input will help shape the direction of my content and ensure it remains relevant and engaging??
Resources
If you found this helpful, consider Sharing ?? and follow me Dr. Oualid Soula for more content like this.
Join the journey of discovery and stay ahead in the world of data science and AI! Don't miss out on the latest insights and updates - subscribe to the newsletter for free ????https://lnkd.in/eNBG5dWm , and become part of our growing community!