A beginners guide to Apache Airflow ( and Docker) - Part 2
This is the second of a two-part series discussing the installation and use of Apache Airflow on a Windows based PC. The first part concentrated on the installation of Airflow (using Docker) and this part will focus on actually using Airflow to construct a workflow. Part 1 can be found here.
Before discussing the creation of an Airflow job, I think it will be beneficial to go over some of the key concepts behind Airflow in a bit more detail. Airflow is a big topic in its own right so treat this as a knowledge skim. For a proper deep dive into it you should consult the official Airflow web-site which can be found at https://airflow.apache.org.
Airflow is a free to download and use, open source platform for authoring, scheduling and monitoring workflows. Once installed, it presents to the user as a web based GUI. The vast majority of workflows in Airflow are ETL data pipelines.
A workflow is a series of one or more directed acyclic graphs (DAGs) of tasks. A DAG is a collection of all the tasks you want to run in a workflow, organised in a way that reflects their relationships and dependencies. DAGS are a particular type of graph structure whereby the data and process flow it represents is one-way only, that’s basically what the Directed and Acyclic part of a DAG means. Here is a diagram of a typical DAG:
This shows the sequencing of two tasks followed by the branching off to run 3 more tasks in parallel before the workflow ends.
DAGs are defined using a Python script, which represents the DAG structure (tasks, their dependencies and scheduling) as code.
One of the main components of a DAG are operators, which is the mechanism of how tasks are run. Some of the main operators of note are:-
- BashOperator - executes a bash command
- PythonOperator - calls an arbitrary Python function
- EmailOperator - sends an email
- SimpleHttpOperator - sends an HTTP request
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
- Sensor - an Operator that waits (polls) for a certain time for an event to happen such as a file being created in a particular directory etc…
Tasks are the workhorses of your Airflow workflow. They do the actual work you need done. Often they will be programs which can be written in Spark, JAVA, Python, BASH shell scripts or any other programming language – unlike the DAG itself which is always written in Python.
The airflow scheduler executes your tasks while following any specified date, time and task dependencies. Communication between tasks in Airflow is not encouraged but if you really need to there is a sub-system call Xcom which can facilitate this.
Airflow also has a concept of variables. They are readable from within a DAG but they’re global in scope and not designed for communicating between tasks. Variables are mostly used for settings and configuration, but you can specify JSON files as variables which provides great flexibility.
Before we get started on coding up an workflow there are three other important Airflow sub-systems to be aware of.
The Airflow Database
Airflow uses its own internal database to store information about your DAGS, and your Airflow history (which DAGs you ran, when you ran them, etc.). The default database shipped is SQLite but you can and usually should change it to something more robust in a production setting – with Postgres being the most popular choice.
The Airflow webserver
This is the front-end GUI that Airflow presents to users. With the webserver you can graphically check the running of DAGS, triggers DAGS to run on an adhoc or scheduled basis, check the logs of previous runs, create or delete Airflow variables and much more.
The Airflow scheduler
This is what actually runs your DAGS and tasks. It makes sure they run when they are supposed to and in the correct order. The schedules that can be applied to DAGS are similar to that available to the UNIX cron command. Tasks are executed by the scheduler using Executors. The default executor shipped is called the Sequential Executor. This allows tasks to be run one after the other in sequence on a single node. This is fine for testing but again for production systems you would normally change this be one of the other executors available which is able to scale to multiple nodes and run jobs in parallel such as the Celery Executor.
I’ve only scratched the surface of how Apache airflow works and its architecture but I think we have enough knowledge so that we can look at writing up our first DAG. Once again I urge you to visit the Airflow home page at https://airflow.apache.org. where you can find loads more info. Let's now look more closely at DAGS themselves.
Anatomy of an Airflow DAG
An Airflow DAG, which is described via a Python script, will normally consist of the following sections.
External module imports
This is where you import any and all dependencies required for your workflow, there are a couple of standard ones that are always imported regardless and then you can add any others that you need on top of that.
Default Arguments
This is a python dictionary that defines various arguments that are available to the DAG. Example of these would be the start_date of the DAG or the number of times it should retry its run if it fails.
DAG Instantiation
This is where you name your DAG, tell it when to run and how often. The schedule_interval directive is used for this and is normally any valid CRON like expression although you can use the following preset variables if you like.
+-----------------+-------------------+----------------------+ |PRESET |MEANING |CRON EQUIVALENT | +-----------------+-------------------+----------------------+ |None |Don’t schedule |N/A | +-----------------+-------------------+----------------------+ |@once |Schedule once only |N/A | +-----------------+-------------------+----------------------+ |@hourly |Run once an hour |0 * * * * | +-----------------+-------------------+----------------------+ |@daily |Once a day |0 0 * * * | +-----------------+-------------------+----------------------+ |@weekly |Once a week |0 0 * * 0 | +-----------------+-------------------+----------------------+ |@monthly |Once a month |0 0 1 * * | +-----------------+-------------------+----------------------+ |@yearly |Once a year |0 0 1 * * | +-----------------+-------------------+----------------------+
One peculiar aspect of the scheduling in Airflow to be aware of is that, if a schedule interval is defined, the job will run at the end of the schedule, not at the beginning. So, for example if you schedule a job to run monthly, it will run on or around the last minute of the last day of the particular month not the first minute of the first day which may or may not be your intention.
Define the tasks
This is where the tasks are defined. Which operators the tasks will be using along with any parameters the operator needs to do its job.
Task dependencies
This is where you set out which tasks are to be run, in which order and the dependencies of each task. So, for example if your DAG consists of two tasks it’s likely you’ll want task 1 to run first and ensure task 2 only runs after task 1 has successfully completed.
Task dependencies can be input in two ways.
1) Using the double chevron character
e.g t1 >> t2 or t2<< t1
Task t2 will run when task t1 successfully completes .
2) Using the set_downstream and/or set_upstream methods.
e.g t1.set_downstream(t2) or t2.set_upstream(t1)
Again, task t2 will run when task t1 successfully completes.
To show that you want to run tasks in parallel you use a python list-like operator.
e.g t1 >> [t2,t3] or t1.set_downstream([t2,t3])
Tasks t2 and t3 run in parallel after task t1 has completed successfully.
An example DAG/workflow
Our example workflow is going to do four simple tasks. Each will use a separate Airflow operator.
1) Indicate the start/stop of the workflow (Dummy operator)
2) Wait for a file to appear in a specified location on our drive (Sensor operator)
3) Rename the file to something else (Bash operator)
4) Read the renamed file using python and output the records read (Python operator)
We require just one final piece of docker set-up before we look in detail at the DAG. We are going to set up a bind mount so that our DAG sub-folder on our windows PC is mounted on to a directory in the docker container. So any changes to files we make on one system show up in the other and vice-versa. This means we can create the file that the sensor operator looks for in our local windows system rather than connecting to our docker container to create it. We can set up a mount by running the following in a DOS command or powershell window.
docker run --rm -v /c/Users/thoma/airflow/dags:/usr/local/airflow -it puckel/docker-airflow bash
Please substitute your own paths in the above command. After you run this command you will be in a Unix bash shell. If you run an ls command from here you should see the same files in the Unix directory that you see in your windows path. Changes to files in either the Windows or UNIX system will be reflected in the other.
The final DAG
The final DAG is shown below. I have put line numbers in to make it easier to explain what the different parts are doing but obviously there are not part of the real file.
1 import airflow 2 from airflow.contrib.sensors.file_sensor import FileSensor 3 from airflow.operators.dummy_operator import DummyOperator 4 from airflow.operators.python_operator import PythonOperator 5 from airflow.operators.bash_operator import BashOperator 6 from datetime import date,datetime,time,timedelta 7 import datetime 8 from datetime import date, timedelta 9 def read_file(filename): 10 myfile = open(filename, 'r') 11 line_count = 0 12 for i in range(4): 13 line_count += 1 14 line = myfile.readline() 15 print("{}: {}".format(line_count, line.strip())) 16 myfile.close() 17 18 default_args = { 19 "owner":"Tom", 20 "depends_on_past" : False, 21 "start_date" : airflow.utils.dates.days_ago( 1 ), 22 } 23 24 with airflow.DAG( "example", default_args= default_args, schedule_interval= None ) as dag: 25 start_task = DummyOperator( task_id= "start" ) 26 stop_task = DummyOperator( task_id= "stop" ) 27 wait_for_file_task = FileSensor( task_id= "wait_for_file", poke_interval= 2, filepath= "/usr/local/airflow/dags/db.txt" ,timeout=20) 28 rename_file_task = BashOperator(task_id='rename_file',bash_command='cd /usr/local/airflow/dags;mv db.txt db_data.txt') 29 read_file_task = PythonOperator(task_id='read_file',python_callable=read_file,op_kwargs= {'filename':'/usr/local/airflow/dags/db_data.txt'}) 31 start_task >> wait_for_file_task >> rename_file_task >> read_file_task >> stop_task
As we discuss the DAG it will probably be useful if you to refer back to the Anatomy of a DAG section.
Lines 1–8 show the various imports that are required for the DAG to work. Mostly these are the operators that kick off the tasks.
Lines 9–16 is our python module that will be used as part of the PythonOperator. All it does is read is read a file, append a line number to each record and print out the line number and record. This output will appear in our DAG log file
Lines 18-22 is where we set our default arguments. Basically it’s just a python dictionary and the arguments are available to all operators in the DAG. There are are a number of different arguments you can include here. I've only shown 3 of the most common, the owner of the job, the start date and the depends_on_past=false directive. Setting this last parameter to false means that the jobs does not depend on a previous successful run of the task to run next time.
Lines 24–29 is where we instantiate the DAG and define the tasks. Here we have named the workflow and defined the schedule_interval as None meaning this job will be triggered manually but normally the job will be run on a timed interval using a CRON-like expression
Line 31 is where we define the task dependencies. In this case it’s a simple linear sequence of running 5 tasks one after the other.
Now, let's look at the task definition section more closely.
Line 25 & 26. These tasks just act as book-ends to our sequence, a start and stop. They are not strictly required at all but I think from an aesthetic point of view they are useful from a DAG monitoring point of view.
Line 27. This is our file watcher task using the FileSensor operator. We specify the full path and filename of the file we are watching for, how often we check for it (poke_interval=2), every 2 seconds in this case and how long in total we should wait for the file to appear ( timeout=20), 20 seconds in this case. If the file is already there then the task ends successfully immediately and the next task is run. If the file does not appear within the timeout period, the task fails and each subsequent task in the DAG fails with an upstream_failed error message.
Line 28. Assuming the previous file watcher task has run ok we use the BashOperator to rename the file. The BashOperator is one of the most useful and versatile of all the operators in that we can run any arbitrary command that you could normally run in Unix Bash shell. This includes executables of programs written in a variety of languages such as JAVA, Spark, Python, C++ as well as regular bash shell commands like we have in our example.
Line 29. After we have renamed the file we open it up for reading using a python function and print out its contents. This output will also appear in our Airflow logs which can be useful for debugging purposes.
Running DAGS
One of the main benefits of Airflow is the ease with which workflows can be run, monitored and checked. Assuming you have created your DAG, open up the Airflow GUI and you should see a screen like below.
On the left hand side near the top under the i in the black circle heading there is a toggle button. If it’s showing as being off click on it to turn it on otherwise the DAG won’t run when you trigger it. To run the DAG, you can trigger it manually. To do this click on the DAG name. The screen will change and you will see additional menu items near the top like this:
If you click on the Trigger Dag item the DAG will begin to run.
Monitoring DAGS
To see graphically what your workflow looks like click on the DAG name from the Airflow home screen GUI, then click on the Graph View menu item. You should see a screen like this which gives a good graphical representation of what your workflow looks like.
To drill down into individual tasks click on the Tree View menu item and you should see something like this.
This shows immediately when and what tasks have run. The little boxes next to each task shows their status and are colour coded. What you want to see hopefully is a sea of dark green squares as this means the tasks have run successfully.
Checking log files
An important part of monitoring tasks within a workflow is checking the log files of tasks that have ran or are in the process of running. You can do this from the above screen. If you left-click on any of the coloured boxes beside each task name a pop-up screen will appear like this.
From here you can simply click on the View Log button and the log file will appear on-screen for you to check like this.
Well, I think that about all I have to say on Airflow (and docker). We covered a lot and it’s a big topic but hope I have given you enough information so that you can use this as a spring board to go on and learn more about this useful tool. One thing. When installing Airflow you can configure it to pre-load a bunch of useful example DAGs into its database which you can view via the GUI. Depending on which way you installed Airflow, do one of the following to see the example DAGs
If using docker-compose, change the line in the file docker-compose-LocalExecutor.yml (or whatever you named it) from
- LOAD_EX=n to - LOAD_EX=y
If using the docker run command use this:
docker run -d -p 8080:8080 -e LOAD_EX=y puckel/docker-airflow
Finally here are a few docker commands I’ve found useful whilst researching for this article
# get a list of container ID's running on your system docker container list # Kill a specified container docker container kill ‘container_id’ # Run a bash shell in your container as user airflow docker run --rm -ti puckel/docker-airflow bash # Run a bash shell in your container as the superuser docker run -u 0 --rm -ti puckel/docker-airflow bash
That last command can be useful if you find yourself having to install other UNIX commands and programs in your container that might be required such as the vim text editor
e.g
$ apt update $ apt install vim