Airflow : The Data Pipeline Framework

Airflow : The Data Pipeline Framework


Focus areas for this post :

  1. What is Airflow?
  2. How useful is Airflow in WMS(workflow management system) space.
  3. Example of Airflow DAG.

Airflow:

Airflow is python based open source data pipeline framework. It was developed at Airbnb and later open sourced with apache software foundation. Airflow designed with the concept of DAGs which is collection of tasks that you want to run. You can read more about Ariflow DAGs here https://airflow.apache.org/concepts.html?highlight=what%20dag


Airflow As WMS:

Airflow makes it easier to develop and manage your data pipeline workflows with its rich command line utilities and Web UI which makes it easy to visualize pipelines running in production , Monitor running processes and troubleshoot issues when needed.


Here are some features of Airflow in terms of WMS:


  • It helps to automate and schedule(Manual or Triggered) files in order to perform any action or task.
  • Airflow is developed in python but we can execute program written in other langauges also. There may be cases where you execute your first task in shell script which generated some data and then transform this data using python in second task.
  • Email notifications can be easily configured in airflow.


Installation and setup:

  • Please follow this link for installation guide.

https://airflow.apache.org/installation.html



Example :


In this example we have created a DAG which executes Four tasks one after another as follows :


Task-1 : Call sleep function for 5 seconds.

Task-2 : Change date format of column in csv file.

Task-3 : Send Email after all the tasks are completed.

We will be creating two python files here: 

1. example_dag.py

2. transform_data.py

example_dag.py:

This file should be placed in dags folder in airflow directory.

Here example_dag.py is Main Dag file for airflow which contains definition  of your DAG.

Please find following snapshot of example_dag.py:

import airflow

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.operators.email_operator import EmailOperator

import time

from projectfiles import transform_data as td

args = {

    'owner': 'airflow',

    'start_date': airflow.utils.dates.days_ago(2),

}

dag = DAG(

    dag_id='example_dag',

    default_args=args,

    schedule_interval=None,

)

def my_sleeping_function(random_base):

    """This is a function that will run within the DAG execution"""

    time.sleep(random_base)

sleeping_task = PythonOperator(

    task_id='sleep_for_5_seconds',

    python_callable=my_sleeping_function,

    op_kwargs={'random_base': 5},

    dag=dag,

    )

date_format_change_task = PythonOperator(

    task_id='date_format_task',

    #provide_context=True,

    python_callable=td.date_format,

    dag=dag,

)

email_task = EmailOperator(

        task_id='send_email',

        to=['[email protected]'],

        subject='Airflow Alert',

        html_content=""" <h3>Job has been completed successfully</h3> """,

        dag=dag

)

sleeping_task.set_downstream(date_format_change_task)

date_format_change_task.set_downstream(email_task)



I have defined three tasks here in this DAG file 

1. sleeping_task

2. date_format_change_task

3. email_task

Lets understand this file deeply:

import airflow

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.operators.email_operator import EmailOperator

import time

#import your python files to be used in DAG

from projectfiles import transform_data as td


  • First we need to import airflow and other supporting packages like DAG.
  • As per our requirement we will be executing python function to perform date_format change task so we imported PythonOperator module, This can vary according to your requirement. For example if you want to execute shell script than you need to import BashOperator module.
  • We also imported EmailOperator Module to send emails after task completion.
  • transform_data is python file which contains function/transformation code to perfom. I will be attaching this code later in this Blog.


args = {

    'owner': 'airflow',

    'start_date': airflow.utils.dates.days_ago(2),

}
  • These are the default arguments for DAG. We can set default arguments like start_date, default email address to be used and many more.

dag = DAG(

    dag_id='example_dag',

    default_args=args,

    schedule_interval=None,

)



  • This code defines DAG definition like what should be DAG id(This id will be display in Airflow UI)
  • We are using default args defined in previous step.
  • Schedule_interval is for scheduling your DAG file , I have given None here because i will be executing my DAG manually.


def my_sleeping_function(random_base):

    """This is a function that will run within the DAG execution"""

    time.sleep(random_base)


  • This is simple sleep python sleep function which will be used in one of the task definition.
sleeping_task = PythonOperator(

    task_id='sleep_for_5_seconds',

    python_callable=my_sleeping_function,

    op_kwargs={'random_base': 5},

    dag=dag,

    )


  • Here we are creating sleeping task, this task will call the “my_sleeping_function” defined in previous step.
  • Task id should be unique for each task.
date_format_change_task = PythonOperator(

    task_id='date_format_task',

    #provide_context=True,

    python_callable=td.date_format,

    dag=dag,

)
  • This task is calling date_format() function which is defined in other python file (transform_data.py)
  • This task is responsible for changing date format of a column present in data file.
email_task = EmailOperator(

        task_id='send_email',

        to=['[email protected]'],

        subject='Airflow Alert',

        html_content=""" <h3>Job has been completed successfully</h3> """,

        dag=dag

)


  • Email_task is defined to send emails.
  • To perform this task we need to do changes in [smtp] sections of airflow.cfg file, where we need to configure our email settings as follows.

[smtp]

# If you want airflow to send emails on retries, failure, and you want to use

# the airflow.utils.email.send_email_smtp function, you have to configure an

# smtp server here

smtp_host = smtp.gmail.com

smtp_starttls = True

smtp_ssl = False

# Uncomment and set the user/pass settings if you want to use SMTP AUTH

smtp_user = youremailaddress

smtp_password = yourpassword

smtp_port = 25

smtp_mail_from = youremailaddress.


sleeping_task.set_downstream(date_format_change_task)

date_format_change_task.set_downstream(email_task)


  • These two lines defines the dependecy of tasks inside the DAG.
  • For example here it says , sleeping_task will execute first and then date_format_task will be executed and email_task will be executed there after.

transform_data.py

This file should be placed under projectfiles folder. This folder is creadted by myself under dags folder for this example only. This is totaly your choice how you manage your project structure.

  • Here i am attaching this code which is used in example_dag.py for your reference. But transformation could be anything as this is just a example to show airflow functionalities.
import pandas as pd

def date_format():

    read_file=pd.read_csv('~/airflow/dags/projectfiles/datafile.csv',sep=',')

    #Changing date format to 'yyyy-mm-dd'

    read_file['transaction_date']=pd.to_datetime(read_file['transaction_date']).dt.strftime('%Y-%m-%d')

    read_file.to_csv('~/airflow/dags/projectfiles/output.csv',index=None)

    print("Data is transformed")


Once we create these file open “localhost:8080” in web browser.

No alt text provided for this image


You can execute DAG using execute button in UI.

No alt text provided for this image


You can check the status of your dag in graph view.


I hope this article will help you to start your first DAG with airflow.



要查看或添加评论,请登录

Prateek Kale的更多文章

社区洞察

其他会员也浏览了