101 Guide on Apache Airflow Operators
Apache Airflow is a tool for automating workflows, tasks, and orchestration?of other programs on clusters of computers. Airflow empowers organizations with its simple rules-based language that allows for complex data processing to be coded in minutes. We'll learn about airflow operators in this post, which you can use to create your own pipelines.
Operators carry out the instructions contained in your script or workflow description file (e.g., .py, .json). There are several Airflow operators that can help you achieve your goals. However, it can be challenging to understand the behavior of these operators without having a good conceptual understanding of Airflow itself.
What are Apache Airflow Operators?
Apache Airflow ?is an open-source MLOps and Data tool for modeling and running data pipelines.?Airflow Operators are commands executed by your DAG each time an operator task is triggered during a DAG run.?In general, anytime an operator task has been completed without generating any results, you should employ tasks sparingly since they eat up CPU time and increase the delay.
Recommended Reading:?How to Automate Data Pipelines with Airflow?
If your DAG is executing steadily, tasks can be an easy way to solve a problem. However, you need to know how operators interact and where to use them for best results. In simple terms,?when you create operator objects, you'll generate tasks.
Recommended Reading:?What are DAGs?
If you want some data processed as quickly as possible and don't need the results right away but instead need the output of that data as part of analysis or workflow, then you'll want to use tasks.
The general architecture of apache airflow is seen in the above image.
Properties Of Airflow Operators :
Types Of Airflow Operators :
Action Operator
Transfer Operator
Sensor Operator
Operators play a crucial role in the airflow process. We'll go through a few of the most popular operators later, but first, let's look at the?relationship between a task and an operator.
The differences between a task and an operator might be confusing at first. The figure below might help you understand the relation between DAG, Task, and Operator.?
The images depict the relationships between the DAG, Tasks, and Operators.
Tasks are ideally self-contained and do not rely on information from other tasks. When you run an operator class object, it becomes a task. Generally, operators() produce <operator.objects> that are transformed into tasks.
Defining The Dag
dag= DAG(
dag_id='t0',
schedule='@time',
...
)
Defining Tasks
t01= op01(
task_id='name_task_1',
operator_params=...,
dag=dag,
...
)
t02= op02(
task_id='name_task_2',
operator_params=...,
dag=dag,
...
)
?Defining Relations between Tasks
t01 >> t02
t02 >> t03
...
#Task 1 -> Task 2 -> Task 3
Let's have a look at some of the most popular operators:
Apache Airflow Bash Operator?- Executes a bash command
BashOperator in Apache Airflow provides a simple method to run bash commands in your workflow. This is the operator you'll want to use to specify the job if your DAG performs a bash command or script.
t1 = BashOperator(
task_id=t1,
dag=dag,
bash_command='echo "Text"'
)
Apache Airflow Python Operator?- Calls an arbitrary python function
The Airflow PythonOperator provides a basic yet effective operator that lets you run a Python callable function from your DAG.
def print_string():
print("Test String")
t2 = PythonOperator(
task_id="t3",
dag=dag,
python_callable=print_string,
)
Apache Airflow Email Operator?- Sends an email
EmailOperator is the most straightforward method for sending emails from airflow. With Email Operator, you can send task-related emails or build up an alerting system. The biggest drawback is that this operator isn't very customizable.
t4= EmailOperator(
task_id=t4,
to='[email protected]',
subject='Alert Mail',
html_content=""" Mail Test """,
dag=dag
)
领英推荐
Apache Airflow PostgresOperator?
The Postgres Operator interface defines tasks that interact with the PostgreSQL database. It will be used to create tables, remove records, insert records, and more.
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2021, 10, 11),
schedule_interval="@once",
catchup=False,
) as dag:
t4= PostgresOperator(
task_id="t4",
sql="""
CREATE TABLE IF NOT EXISTS pet (
table_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
table_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
Apache Airflow SSH Operator
SSHOperator is used to execute commands on a given remote host using the?ssh_hook.
t5 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='ssh_connectionid',
command='echo "Text from SSH Operator"'
)
Apache Airflow Docker Operator
Docker Operator helps to execute commands inside a docker container. Docker is a tool for creating and managing "containers," which are tiny virtual systems where you may run your code. With the help of the airflow docker operator, you can store files in a temporary directory created on the host and mounted into the container.
t6 = DockerOperator(
task_id='docker_command',
image='centos:latest',
api_version='auto',
auto_remove=True,
command="/bin/sleep 30",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
Apache Airflow HTTP Operator
To perform an activity, a call is made to an endpoint on an HTTP system. This is beneficial if you're using an API that returns a big JSON payload and you're only interested in a part of it.
t7 = HttpSensor(
task_id='t7',
http_conn_id='http_default',
endpoint='',
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=4,
dag=dag,
)
Apache Airflow Snowflake Operator
SnowflakeOperator performs SQL commands on a Snowflake database. These operators can create, insert, merge, update, delete, copy into, and terminate tasks if needed.?
dag = DAG(
'example_snowflake',
start_date=datetime(2021, 11, 11),
default_args={'snowflake_id': SNOWFLAKE_ID},
tags=['example'],
catchup=False,
)
t8 = SnowflakeOperator(
task_id='t8',
dag=dag,
sql=CREATE_TABLE_SQL_STRING,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
Apache Airflow Spark Operators
Apache Spark is a general-purpose cluster computing solution that is quick and scalable. It provides Spark SQL for SQL and structured data processing, MLlib for machine learning, and a lot more. All of the configurations for SparkSqlOperator come from the operator parameters.
t9= SparkJDBCOperator(
cmd_type='spark_to_jdbc',
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="t9",
)
Apache Airflow SensorBase Operators
Sensor operators continue to run at a set interval, succeeding when a set of criteria is satisfied and failing if they time out. Is it necessary for you to wait for a file? Is it possible to see if a SQL item exists? Is it possible to postpone the execution of your DAG? That is the extent of the Airflow Sensors' capabilities.
def _failure_callback():
if isinstance(context['exception'], AirflowSensorTimeout):
print("timed out message")
with DAG() as dag:
t10= FileSensor(
task_id='t10',
poke_interval=100,
timeout=20,
mode="reschedule",
fail_callback=fail_callback
)
Apache Airflow Bigquery Operators
BigQueryCheckOperator can be used to execute checks against BigQuery.?
create_table = BigQueryCreateEmptyTableOperator(
task_id="t11",
dataset_id=DATASET_NAME,
table_id="test_table11",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
There are many more operators; you can view the complete?list of airflow operators .
Check?Apache Documentation ?to learn more.?
Apache Airflow Operators Best Practices
To get the most out of these operators, you must know what they do and when it’s appropriate to apply them in your particular use case. In general, airflow operators fall into two categories:?scheduling tasks or data manipulation tasks. A scheduling operator will schedule events based on some time pattern, such as expiring over a given amount of time. A data manipulation operator will perform a specific processing task on incoming data sets, such as breaking out tables for better query-ability.
dag = DAG('dagname', default_args=default_args, schedule_interval='0 9 * * *')
Learn how to?Improve model health with Censius AI
Conclusion
Several?MLOps tools ?are available, but Apache Airflow offers unique advantages, and more businesses are using it to manage their data pipelines. An AirFlow Operator is an orchestrator for data delivered by an Airflow pipeline.?The operator tells the pipeline where to send data, how often to send it, and what actions to take when new data arrives.?We looked at what operators are and discussed several types of operators in this article. I hope you enjoyed the article and stay safe!
This article was originally published on censius.ai and republished here because it's awesome:)