Apache Airflow for Data Engineering pipeline Part 2 - Airflow Workflow
In the pre-article, a basic explanation of what airflow is and how it works. I also explained about the dashboard.
In this article, I'll demonstrate how to create and instantiate a DAG in Airflow. Now I'm using VSCode to create my airflow DAG called hello_world.py!
So, it starts like any other python file, there's nothing special to it. On the first line, I have from datetime import timedelta. So, we typically start with our own imports, the ones particular to our own functions, put a space, and then put our airflow imports. So, on line 3, I import airflow. Then from airflow.models I import DAG.
in lines 5-7 So, in this case, I import three operators, the PythonOperator, the BashOperator, and the DummyOperator.
And the DummyOperator does just what you'd expect, it does nothing, it's just an empty operator. Then on line 9, I have args equals this dictionary with owner Ehsan and start_date is set to airflow.utils.dates.days_ago(1). So, this sets a start date for when this DAG is going to be executed.
Now, this is useful if we want a DAG to start at some point in the future. So, we enable it and then we set a future date and it will only start on that date. If we'd like a DAG to start right away, and we can just set some date in the past or the current date in time, and it will start right away. Then on line 14, I create the DAG. So, dag = DAG in uppercase.
And I pass several arguments to this. The first is usually the dag_id. So, it's a unique name we set for this DAG, in this case, hello_world. We set the default_args equal to our args variable, so the arguments we're passing into it. Then we have schedule_interval, which we set equal to the chron notation schedule for this DAG.
If you want to know more about the trigger and time scheduling in airflow you can visit this link below:
On line 18, I set catchup=False. So, it's not going to try to rerun this script at every interval from the start time until now. So, this is useful if we have some incremental uploads or updates where we're extracting based on time. So, we want to be able to backfill and set catchup equals to true and it will catch up on all past schedules that it missed.
And then another useful parameter is the dagrun_timeout, which we can set to timeout after a certain interval. So, in this case, timedelta(minutes=60). So, we'll force this script to timeout if it gets stuck or unresponsive.
Then on line 22, I create my first operator. So, my first task, start equals dummy operator. And then it has its task_id, which is equal to dummy, and dag = dag. So, every operator regardless of what it is has the task_id and a dag that it's associated with.
On line 27, I have bash_hello = BashOperator, so it has a task_id of bash_hello, a bash command. So, this is the bash command it's going to run, and any valid bash is fine here. So, echo Hello World! From BashOperator. And then we assign it to the dag. Then on line 33, I define a Python function. So,?if we're using a Python operator, we usually kick it off with a Python function.
So, on line 33 I have def print_hello_world with the recipient as its parameter. And then on line 34, I created a message equal to hello. And then I have the open and close brace for the string formatting so we'll insert the recipient at this point from PythonOperator.format(recipient). Then I print the message and I also return the message. So, another way of?printing it to the logs, anything that gets returned from the function called via PythonOperator gets printed to the log. So, I return the message.
Then on line 41, I have python_hello = PythonOperator. It has the task_id of python_hello. The python_callable which is very important is the name of our print_hello_world function.
But I don't put the parenthesis and the argument in here, I have to put it separately. But I need to pass an argument to this function. So, what I do is I could use op_, I can use args or kwargs, so K-W-A-R-G-S, where we can pass the dictionary. So, in this case, I'll pass recipient, and we will say this is World. So, we pass this dictionary and we'll pass the recipient World to our print_hello_world function, or python_callable in this case.
And we set dag equal to dag as usual. And then the last part is usually to define our graph.
领英推荐
So, we have started with our left-shift or upstream operator, bash_hello. So, the start is going to be upstream from bash_hello, And then line 49, the start is upstream from python_hello. So that defines this hello_world DAG.
Now I'll ALT + Tab over to the Chrom web browser. And in my DAG's listing, I'll select DAGs from the navbar to reload this page. And if I scroll down in alphabetical order, I should find hello_world. Now what I'll do is I'll enable it. And then I want to kick it off right away.
So, I'll go over to the Trigger Dag icon and select it so it'll trigger our hello_world DAG. If I scroll down, it shows that our DAG is running.
And now what I'd like to do is go into the graph view of our DAG. So here in the graph view, I see that my tasks are highlighted in dark green, which means success.
So, if I select one of the tasks, say bash_hello, and select View Log, it shows the log of this task.
And if we scroll down, it shows lots of details about this task. And if we scroll down to the output section, so there's an info message output, Hello, World From, BashOperator.
And then it gives me the return code of the operation return code 0. Now I can go back to our graph view, select python_hello and View Logs for this instance.
And if I scroll down, I see where it was run.
So, I see the info message Hello, World From, PythonOperator. And then another info message Done, the returned value was: Hello, World From, Python. So, one is the print message and the other is the return from the function. Then we can go back to our DAGs listing to see the details of hello_world, then it tells us it was the last run at the date and time we just ran it.
And that concludes this demonstration on how to create and instantiate a DAG in Airflow.
Web and Mobile app Designer ?? #Figma advocate
3 年We are on the verge of development by integration between IT and Industries need I think ????????