Create your first Airflow DAG

Let's start creating a Hello World workflow, which does nothing other than sending "Hello World!" to the log.

A DAG file, which is basically just a Python script, is a configuration file specifying the DAG’s structure as code.

The following are the steps by step to write an Airflow DAG or workflow:

  1. Creating a python file
  2. Importing the modules
  3. Default Arguments for the DAG
  4. Instantiate a DAG
  5. Creating a callable function
  6. Creating Tasks
  7. Setting up Dependencies
  8. Verifying the final Dag code
  9. Test the Pipeline
  10. Running the DAG

Step 1: Creating a python file

  • Create the?${AIRFLOW_HOME}/dags?directory if it is not present. Under?${AIRFLOW_HOME}/dags?directory we can put the script in it (including python script, shell, sql, etc., to facilitate scheduling).

mkdir -p ${AIRFLOW_HOME}/dags        

  • Create a new python file?hello_world_dag.py?inside the?${AIRFLOW_HOME}/dags/?directory.

HelloWorld Application Structure:

${AIRFLOW_HOME}
├── airflow.cfg
├── airflow.db
├── dags                    <- Your DAGs directory
│?? └── hello_world_dag.py  <- Your DAG definition file
└── unittests.cfg        

Add the following steps content to the?hello_world_dag.py?file.

vi ${AIRFLOW_HOME}/dags/hello_world_dag.py

Step 2: Importing the modules

  • Import the?"timedelta, datetime"?module from?datetime?package to help us schedule the dags.
  • Import the?"DAG"?module from?airflow?package to instantiate the DAG object.
  • Import the?DummyOperator?from the?"airflow.operators.dummy_operator"?module.
  • Import the?PythonOperator?from the?"airflow.operators.python_operator"?module.

#datetime
from datetime import timedelta, datetime

# The DAG object
from airflow import DAG

# Operators
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator        

Step 3: Default Arguments for the DAG

  • Default arguments are passed to a DAG as default_args dictionary.
  • This makes it easy to apply a common parameter to many operators without having to type it many times.

# initializing the default arguments
default_args = {
		'owner': 'Ranga',
		'start_date': datetime(2022, 3, 4),
		'retries': 3,
		'retry_delay': timedelta(minutes=5)
}        

Step 4: Instantiate a DAG

  • Next we will instantiate a DAG object by passing the?"dag_id"?string which is the unique identifier of the dag.
  • It is recommended to keep the python file name and?"dag_id"?same, so we will assign the?"dag_id"?as?"hello_world_dag".

# Instantiate a DAG object
hello_world_dag = DAG('hello_world_dag',
		default_args=default_args,
		description='Hello World DAG',
		schedule_interval='* * * * *', 
		catchup=False,
		tags=['example, helloworld']
)        

Example usage:

-   Daily schedule:
		-   `schedule_interval='@daily'`
		-   `schedule_interval='0 0 * * *'`        

  • By default, Airflow will run tasks for all past intervals up to the current time. This behavior can be disabled by setting the?catchup?parameter of a DAG to false, in which case Airflow will only start executing tasks from the current interval.

Step 5: Creating a callable function

  • We also need to create a function that will be called by the?PythonOperator?as shown below:

# python callable function
def print_hello():
		return 'Hello World!'        

Step 6: Creating Tasks

  • An object?instantiated?from an?operator?is called a?task. There are various types of operators available but we will first focus on?DummyOperator?and?PythonOperator.
  • A?PythonOperator?is used to?call a python function?inside your DAG. We will create a PythonOperator object that calls a function which will return 'Hello World!' upon it's call.
  • Like a DAG object has?"dag_id", a PythonOperator object has a?"task_id"?which acts as it's identifier.
  • It also has?"python_callable"?parameter which takes the name of the function to be called as it's input.

# Creating first task
start_task = DummyOperator(task_id='start_task', dag=hello_world_dag)

# Creating second task
hello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)

# Creating third task
end_task = DummyOperator(task_id='end_task', dag=hello_world_dag)        

Step 7: Setting up Dependencies

  • Set the dependencies or the order in which the tasks should be executed.
  • We can set the dependencies of the task by writing the?task names?along with?>>?or?<<?to indicate the?downstream?or?upstream?flow respectively.
  • Here are a few ways you can define dependencies between them:

# Set the order of execution of tasks. 
start_task >> hello_world_task >> end_task        

Step 8: Verifying the final DAG code

After compiling all the elements of the DAG, our?final code?should look like this:

cat ${AIRFLOW_HOME}/dags/hello_world_dag.py

#datetime
from datetime import timedelta, datetime

# The DAG object
from airflow import DAG

# Operators
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

# initializing the default arguments
default_args = {
		'owner': 'Ranga',
		'start_date': datetime(2022, 3, 4),
		'retries': 3,
		'retry_delay': timedelta(minutes=5)
}

# Instantiate a DAG object
hello_world_dag = DAG('hello_world_dag',
		default_args=default_args,
		description='Hello World DAG',
		schedule_interval='* * * * *', 
		catchup=False,
		tags=['example, helloworld']
)

# python callable function
def print_hello():
		return 'Hello World!'

# Creating first task
start_task = DummyOperator(task_id='start_task', dag=hello_world_dag)

# Creating second task
hello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)

# Creating third task
end_task = DummyOperator(task_id='end_task', dag=hello_world_dag)

# Set the order of execution of tasks. 
start_task >> hello_world_task >> end_task        

This file creates a simple DAG with just two operators, the?DummyOperator, which does nothing, and a?PythonOperator?which calls the?print_hello?function when its task is executed.

Step 9: Test the Pipeline

Check the DAG file contains valid Python code by executing the file with Python:

python3 ${AIRFLOW_HOME}/dags/hello_world_dag.py        

Step 10: Running the DAG

  • Activate the virtual environment.

source ~/install/airflow-tutorial/airflow_venv/bin/activate
export AIRFLOW_HOME=~/install/airflow-tutorial/airflow        

  • Start the airflow webserver and scheduler.

airflow webserver \
--pid ${AIRFLOW_HOME}/logs/airflow-webserver.pid \
--stdout ${AIRFLOW_HOME}/logs/airflow-webserver.out \
--stderr ${AIRFLOW_HOME}/logs/airflow-webserver.out \
-l ${AIRFLOW_HOME}/logs/airflow-webserver.log \
-p 8080
-D

airflow scheduler \
--pid ${AIRFLOW_HOME}/logs/airflow-scheduler.pid \
--stdout ${AIRFLOW_HOME}/logs/airflow-scheduler.out \
--stderr ${AIRFLOW_HOME}/logs/airflow-scheduler.out \
-l ${AIRFLOW_HOME}/logs/airflow-scheduler.log \
-D        

No alt text provided for this image

  • The DAG should run successfully. In order to check the?graph view?or?tree view, you can hover over links and select?Graph?or?Tree?options.

No alt text provided for this image

  • You can also view the?task's execution information?using?logs. To do so, simply,?click on the task, and you should see the following dialog box:

No alt text provided for this image


  • Next, click on the?Log?button, and you will be redirected to the task's log.

No alt text provided for this image


Happy Learning.

Suwarna Baraskar

DevOps Engg| AWS | Docker | kubernetes | Jenkins | CI/CD|Ansible |Terraform|ML

1 年

Ranga Reddy Hello Sir,while importing (from airflow import DAG) this object getting error that module 'pendulum' has no attribute 'tz' ?? Any solution for this

回复
Ayobami Adeleke

Microsoft Certified Fabric Analytics Engineer | Microsoft Certified Trainer | Azure Data Engineer | MCSA: Business Intelligence Developer | Master Data Management | Enterprise Data Analyst

1 年

Hello Ranga, Thanks for this great piece. I have an issue at hand getting my DAG imported to the Airflow Webserver UI. I have done a lot of checks, even used the airflow dags list-import-errors command to check for DAG files with errors and it returns none...It baffles me how the UI is showing the import error: Broken DAG: [/home/mikeayus/airflow/dags/extractclickhousetoSQLlite.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed File "/home/mikeayus/airflow/dags/extractclickhousetoSQLlite.py", line 4, in <module> from airflow.providers.jdbc.operators.jdbc import JdbcOperator ModuleNotFoundError: No module named 'airflow.providers.jdbc' Any help or advise on what could be wrong? Regards

回复

Looking to learn about Apache Airflow? You've come to the right place! Our Apache Airflow tutorial is designed to help you understand the basics of this powerful open-source platform and how it can be used to manage and orchestrate complex workflows. https://www.sparkcodehub.com/apache-airflow-tutorial

回复
Andreas ?hrlund-Richter, Ph.D.

Data Scientist at Swedish Armed Forces

2 年

Thank you Ranga, this was very helpful :)

Nithin Chandran

Associate Lead Engineer

2 年

Hi Ranga I did all the airflow setup on an EC2 instance. Created the above sample dag that you had given. When trying to execute the dag from the webserver, it seems to show the start_task got SUCCESS but the hello_world_task got stuck in up_for_retry state and eventually it fails. I am having the same issue with other sample dags also. The tasks does not seem to be starting the execution. Can you pls let me know if I am missing something here in my setup. Thanks

回复

要查看或添加评论,请登录

Ranga Reddy的更多文章

  • Apache Iceberg History & Spark Supportability Matrix

    Apache Iceberg History & Spark Supportability Matrix

    1. Introduction The Spark and Iceberg Supportability Matrix provides comprehensive information regarding the…

    2 条评论
  • Apache Spark Supportability Matrix

    Apache Spark Supportability Matrix

    1. Introduction: One of the most common challenges faced while developing Spark applications is determining the…

  • Spark History Server Docker Image

    Spark History Server Docker Image

    A Sample Docker image for Spark History Server to deploy and manage the Spark Event Logs locally. Step1: Pull the…

  • Shell Script to generate Random CSV data

    Shell Script to generate Random CSV data

    Source Code: https://gist.github.

    3 条评论
  • Spark Configuration Generator

    Spark Configuration Generator

    Hello Spark Enthusiast Are you looking for generating the Spark Configuration based on Resources (Hardware…

    3 条评论
  • Install Apache Airflow on Mac OS

    Install Apache Airflow on Mac OS

    Airflow is written in python, so python needs to be installed in the environment, and python must be greater than 2.7…

    17 条评论
  • Spark code to create a random sample data

    Spark code to create a random sample data

    In this article you will learn how to create a random sample data by using spark. import org.

  • Ranga's Spark Project Template Generator

    Ranga's Spark Project Template Generator

    Hi All, I have created open source spark project template generator application. By using this application you can…

    1 条评论

社区洞察

其他会员也浏览了