Create your first Airflow DAG
Let's start creating a Hello World workflow, which does nothing other than sending "Hello World!" to the log.
A DAG file, which is basically just a Python script, is a configuration file specifying the DAG’s structure as code.
The following are the steps by step to write an Airflow DAG or workflow:
Step 1: Creating a python file
mkdir -p ${AIRFLOW_HOME}/dags
HelloWorld Application Structure:
${AIRFLOW_HOME}
├── airflow.cfg
├── airflow.db
├── dags <- Your DAGs directory
│?? └── hello_world_dag.py <- Your DAG definition file
└── unittests.cfg
Add the following steps content to the?hello_world_dag.py?file.
vi ${AIRFLOW_HOME}/dags/hello_world_dag.py
Step 2: Importing the modules
#datetime
from datetime import timedelta, datetime
# The DAG object
from airflow import DAG
# Operators
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
Step 3: Default Arguments for the DAG
# initializing the default arguments
default_args = {
'owner': 'Ranga',
'start_date': datetime(2022, 3, 4),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
Step 4: Instantiate a DAG
# Instantiate a DAG object
hello_world_dag = DAG('hello_world_dag',
default_args=default_args,
description='Hello World DAG',
schedule_interval='* * * * *',
catchup=False,
tags=['example, helloworld']
)
Example usage:
- Daily schedule:
- `schedule_interval='@daily'`
- `schedule_interval='0 0 * * *'`
Step 5: Creating a callable function
# python callable function
def print_hello():
return 'Hello World!'
Step 6: Creating Tasks
领英推荐
# Creating first task
start_task = DummyOperator(task_id='start_task', dag=hello_world_dag)
# Creating second task
hello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)
# Creating third task
end_task = DummyOperator(task_id='end_task', dag=hello_world_dag)
Step 7: Setting up Dependencies
# Set the order of execution of tasks.
start_task >> hello_world_task >> end_task
Step 8: Verifying the final DAG code
After compiling all the elements of the DAG, our?final code?should look like this:
cat ${AIRFLOW_HOME}/dags/hello_world_dag.py
#datetime
from datetime import timedelta, datetime
# The DAG object
from airflow import DAG
# Operators
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# initializing the default arguments
default_args = {
'owner': 'Ranga',
'start_date': datetime(2022, 3, 4),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
# Instantiate a DAG object
hello_world_dag = DAG('hello_world_dag',
default_args=default_args,
description='Hello World DAG',
schedule_interval='* * * * *',
catchup=False,
tags=['example, helloworld']
)
# python callable function
def print_hello():
return 'Hello World!'
# Creating first task
start_task = DummyOperator(task_id='start_task', dag=hello_world_dag)
# Creating second task
hello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)
# Creating third task
end_task = DummyOperator(task_id='end_task', dag=hello_world_dag)
# Set the order of execution of tasks.
start_task >> hello_world_task >> end_task
This file creates a simple DAG with just two operators, the?DummyOperator, which does nothing, and a?PythonOperator?which calls the?print_hello?function when its task is executed.
Step 9: Test the Pipeline
Check the DAG file contains valid Python code by executing the file with Python:
python3 ${AIRFLOW_HOME}/dags/hello_world_dag.py
Step 10: Running the DAG
source ~/install/airflow-tutorial/airflow_venv/bin/activate
export AIRFLOW_HOME=~/install/airflow-tutorial/airflow
airflow webserver \
--pid ${AIRFLOW_HOME}/logs/airflow-webserver.pid \
--stdout ${AIRFLOW_HOME}/logs/airflow-webserver.out \
--stderr ${AIRFLOW_HOME}/logs/airflow-webserver.out \
-l ${AIRFLOW_HOME}/logs/airflow-webserver.log \
-p 8080
-D
airflow scheduler \
--pid ${AIRFLOW_HOME}/logs/airflow-scheduler.pid \
--stdout ${AIRFLOW_HOME}/logs/airflow-scheduler.out \
--stderr ${AIRFLOW_HOME}/logs/airflow-scheduler.out \
-l ${AIRFLOW_HOME}/logs/airflow-scheduler.log \
-D
Happy Learning.
DevOps Engg| AWS | Docker | kubernetes | Jenkins | CI/CD|Ansible |Terraform|ML
1 年Ranga Reddy Hello Sir,while importing (from airflow import DAG) this object getting error that module 'pendulum' has no attribute 'tz' ?? Any solution for this
Microsoft Certified Fabric Analytics Engineer | Microsoft Certified Trainer | Azure Data Engineer | MCSA: Business Intelligence Developer | Master Data Management | Enterprise Data Analyst
1 年Hello Ranga, Thanks for this great piece. I have an issue at hand getting my DAG imported to the Airflow Webserver UI. I have done a lot of checks, even used the airflow dags list-import-errors command to check for DAG files with errors and it returns none...It baffles me how the UI is showing the import error: Broken DAG: [/home/mikeayus/airflow/dags/extractclickhousetoSQLlite.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed File "/home/mikeayus/airflow/dags/extractclickhousetoSQLlite.py", line 4, in <module> from airflow.providers.jdbc.operators.jdbc import JdbcOperator ModuleNotFoundError: No module named 'airflow.providers.jdbc' Any help or advise on what could be wrong? Regards
Data Engineer
2 年Looking to learn about Apache Airflow? You've come to the right place! Our Apache Airflow tutorial is designed to help you understand the basics of this powerful open-source platform and how it can be used to manage and orchestrate complex workflows. https://www.sparkcodehub.com/apache-airflow-tutorial
Data Scientist at Swedish Armed Forces
2 年Thank you Ranga, this was very helpful :)
Associate Lead Engineer
2 年Hi Ranga I did all the airflow setup on an EC2 instance. Created the above sample dag that you had given. When trying to execute the dag from the webserver, it seems to show the start_task got SUCCESS but the hello_world_task got stuck in up_for_retry state and eventually it fails. I am having the same issue with other sample dags also. The tasks does not seem to be starting the execution. Can you pls let me know if I am missing something here in my setup. Thanks