Airflow Task - Part 3
Airflow Task

Airflow Task - Part 3

In this article, I'll discuss passing arguments to DAGs and Python operator tasks.

So first we'll look at some common arguments used to create the DAG object for our tasks. Then we'll look at how to pass arguments to Python functions in our Python operator, including the dictionary with our DAG context so?we can access it from within the task itself.

At the beginning of this example, I've created example.py.?

No alt text provided for this image

I import a few functions, so from datetime, I import datetime and timedelta, and then from pprint, I import pprint. the pretty print function, so we can print a Python dictionary in a nice format that we can read in the output.

Then on line 4, I import airflow, and I import DAG and PythonOperator which I'm going to use in this example.

Then on line 9, I start by declaring a function called task_retry. And I also declare two more functions, task_failure and?task_success.

Each of these takes a context variable with the application context.

So, it's a dictionary with information about the running tasks. Now, these are call-back functions called for when the task has to retry, when it fails and when the DAG succeeds, and in the code below, I'll show you where we assign these two their respective callbacks. So now on line 18, I create a dictionary called args.


args = {

??????????? 'owner':'Ehsan',

??????????? 'depends_on_past':False,

??????????? 'start_date': datetime(2021, 8, 1),

??????????? 'end_date': datetime(2021, 9, 1),

??????????? 'email':['[email protected]'],

??????????? 'email_on_failure': False,

??????????? 'email_on_retry': False,

??????????? 'retries': 3,

??????????? 'retry_delay': timedelta(minutes=1),

??????????? 'priority_weight': 10,

??????????? 'execution_timeout': timedelta(seconds=30),

??????????? 'on_failure_callback': task_failure,

??????????? 'on_success_callback': task_success,

??????????? 'on_retry_callback': task_retry,

?????????}        

So, it has an owner, so the owner username is called Ehsan, which is quite?common in our airflow tasks if we're only in a single user system.

Then I have depends_on_pass. This command is a parameter, an argument that we pass through our DAG to say, whether or not to run tasks sequentially. So, in a synchronized fashion waiting for the previous task's schedule to succeed before continuing with the next one.

We have a start date and end date which are pretty self-explanatory. But their datetime object's saying when we want the tasks to begin and when we want them to end. So, the task for this DAG will run between 2021-8-1, or August 1, 2021, until September 1st, 2021.

We can specify an email, an email on failure and an email on retry, if we like an email to be sent whenever there's a failure and whenever there's a retry. We can say the number of retries we'd like to perform if a task fails.

So, in this case, I've set retries to be 3. We can specify a retry delay, saying how quickly we'd like to retry?if it does fail and here, we indicate a timedelta where minutes=1.

If this task or this DAG does fail, then it will retry every minute?until it succeeds or until it does these three retries and gives up.

In line 28, we have a priority weight option, so when execution gets backed up, the priority weight comes in to play, and it's compared with other tasks to decide who goes next.

Then in line 29, we have an execution timeout which takes a timedelta for how long we allow this execution to take, if our DAG takes longer than 30 seconds to complete, it will fail, and then we set up our callbacks for on failure, on success, and on retry.

So, the on-failure callback is set to our task_failure, on-success callback set to task_success, and on-retry callback set to task_retry. So, once we set the arguments, we can create our DAG. So, I have a DAG on line 35, and it has a DAG idea of My_Task.

Its default_args parameter is set to be equal to args, the ones we've just defined. We specify this schedule interval, and this is the cron notation of our schedule. In this case, I have a */5 * * * *. So that's cron speak for executing every five minutes.

And then another important variable is catchup, this tells our DAG whether or not to backfill any missed executions, if we're performing incremental executions, it might be important to run any missed tasks that we're scheduled to run, if we missed some scheduled tasks, it will go back, perform the incremental loads from those tasks and catch us up to date.

And in this case, I don't want to do that so I set catchup=False.

Now let's have a look at our Python operator function. I define a regular Python function called hello_world_task and it has two parameters; *args and **kwargs. All this does is do a pprint, a pretty print of the args and?a pprint of the kwargs and returns Hello World, Task.

So, we'll be able to inspect the logs, the output and have a look and?see what the args and kwargs contained in this function.

And then a line 47, I create the task with the task ID of python_hello.

Then I specify the python call back to be our hello task function. Provide contacts, now this is an important parameter and if we set it equal to true it provides the function with the entire DAG context with important information about the running tasks and the running DAG itself. The op_args is the arguments that we pass so?this is a list with hello and exclamation mark and that will be available in our args variable inside of hello_world_task.

If we want to set the kwargs, we pass a dictionary.

op_kwargs is equal to the dictionary with the recipient set to hello and sender set to airflow. But when we set op_kwargs and provide context to be equal to true, it will mix those variables together into a single dictionary inside of kwargs, inside of our hello_world_task function.

we'll have to look through our pretty print result of kwargs to find the recipient and sender in the list of variables printed, and then on line 53, we set dag=dag for our python operator and that completes the task.

now I'll tab over to the Chrome web browser and our airflow interface.

I'll click on the DAGs option in the toolbar, we'll refresh the DAGs list and if I scroll down to My_Task, it's currently running.

So now go to the quick links and select Graph View.

airflow

This will get us to where we need to go. It shows the python_hello in the graph view as highlighted in green, because its most recent execution was successful.

airflow

So, if I select it, it opens up a model dialog window, and?I'm going to select View Log because I want to see the results of the most recently run instance.

if I look in the logs, I'll scroll into the logs and I'll scroll down until I get to a place where I see hello and exclamation mark printed, that was the output from our function, and then it prints the kwargs variable, you can see there's the end date, and there are all sorts of information about the DAG, the start date and?end date, the execution date so there's all this information.

Some of it, which we provided to our DAG when we created it. Some of it's created by airflow itself, and some of it's what we said in the op_kwargs like the recipient world and the sender airflow. That all got mixed into this kwargs variable.

And then if I scroll down, I see done, return value was hello_world_task, so that's the string that was returned by our function, and?then we have Task Run Successfully!!

That was a string we printed by the task_success callback that we created, and that concludes this demonstration of including arguments in airflow tasks.







??Robin Ayoub

AI Training Data | NLP | Prompt Engineering | Multilingual Speech-to-Text Transcription | Chatbot | Conversational AI | Machine translation | Human in the loop AI integration

4 个月

Ehsan, Very interesting, thanks for sharing!

回复

要查看或添加评论,请登录

Ehsan Hemati的更多文章

社区洞察

其他会员也浏览了