Airflow Basics: Installation & Guide
Divine Sam
Data Engineer @NxtWave | Python | SQL | Data Analysis | Spark | Databricks | GCP | DBT | BigQuery | Looker
What is Apache Airflow?
Apache Airflow, or Airflow, is an open-source tool and framework for running your data pipelines in production. As an industry-leading data workflow management tool, Apache Airflow leverages Python to allow data practitioners to define their data pipelines as code. Airflow adds the ability to schedule pipeline execution and observe performance, making it a centralized hub for all of your data workflows. Whether you’re preparing training data for a model, or persisting data in a data lake, Airflow adds the functionality to make your data pipelines production-ready.
Airflow was first created at Airbnb by Maxime Beauchemin in 2014 and later joined the Apache Software Foundation’s Incubator program in March 2016, before being announced as a top-level project in 2019. According to Airflow’s 2022 survey, Airflow is downloaded millions of times per month, and thousands of companies, both large and small, rely on the tool.
Key Features of Airflow
Airflow’s framework, as well as its architecture, have several key features that make it unique. First, let’s dig a little deeper into the most important features of Airflow’s framework.
Features of Airflow’s Framework
The simplest unit of the Airflow framework are tasks. Tasks can be thought of as operations or, for most data teams, operations in a data pipeline.
A traditional ETL workflow has three tasks; extracting, transforming, and loading data. Dependencies define the relationships between tasks. Going back to our ETL example, the “load” task depends on the “transform” task, which, in turn, depends on the “extract” task. The combination of tasks and dependencies create DAGs, or directed-acyclic graphs. DAGs represent data pipelines in Airflow, and are a little convoluted to define. Instead, let’s take a look at a diagram of a basic ETL pipeline:
The DAG above has three tasks, with two dependencies. It’s considered a DAG because there are no loops (or cycles) between tasks. Here, the arrows show the directed nature of the process; first, the extract task is run, followed by the transform and load tasks. With DAGs, it’s easy to see a distinct start and end to the process, even if the logic is complex, like the DAG shown below:
In this DAG, the logic is a bit crazier. There is branching based on a condition, and a few tasks are run in parallel. However, the graph is directed, and there are no cyclic dependencies between tasks. Now, let’s take a look at a process that is not a DAG:
In this diagram, there is a distinct loop between the transform and validate tasks. In some cases, this DAG may run forever, if there is no way to break out of this loop.
When building data pipelines, even outside of Airflow, it’s best practice to stay away from creating workflows that can’t be represented as DAGs, as you may lose key features, such as determinism or idempotency.
Features of Airflow’s Architecture
To schedule DAGs, execute tasks, and provide visibility into data pipeline execution details, Airflow leverages a Python-based architecture made up of the components below:
Whether running Airflow locally or in a production environment, each of these components must be up and running for Airflow to function properly.
The scheduler is responsible for (you probably guessed it) scheduling DAGs. To schedule a DAG, a start date and schedule interval for the DAG must be provided when the DAG is written as Python code.
Once a DAG is scheduled, tasks within those DAGs need to be executed, which is where the executor comes in. The executor does not run the logic within each task; it just allocates the task to be run by whatever resources are configured to do so. The metadata database stores information about DAG runs, such as whether or not the DAG and its associated tasks ran successfully.
The metadata database also stores information such as user-defined variables and connections, which help when building production-grade data pipelines. Finally, the web server provides the user interface with Airflow.
This user interface, or UI, provides data teams with a central tool to manage their pipeline execution. In the Airflow UI, data teams can view the state of their DAGs, manually re-run DAG, store variables and connections, and so much more. The Airflow UI provides central visibility into data ingestion and delivery processes, helping to keep data teams informed and aware of their data pipeline performance.
Installing Apache Airflow
There are a number of ways to install Apache Airflow. We’ll cover one of the best and easiest way which worked for me, Installing Airflow with Astro CLI(Command Line Interface)
Installing Airflow with the Astro CLI
Astronomer, a managed Airflow provider, provides a number of free tools to help make working with Airflow easier. One of these tools is the Astro CLI.
The Astro CLI makes it easy to create and manage everything you need to run Airflow. To get started, you’ll first have to install the CL(command line).
Prerequisites
By default, the Astro CLI uses Docker as its container management engine for running Airflow locally.
Installation
To install the latest version of the Astro CLI, run the following command in your terminal:
brew install astro
To install a specific version of the Astro CLI, specify the version you want to install at the end of the command. For example, to install Astro CLI version 1.25.0, you would run the following command: (not really required, you install the latest version with the command above)
brew install [email protected]
If you specify only a major version, this command installs the latest minor or patch version available for the major version. For a list of all available versions, see the CLI release notes.
Confirmation
To verify that the correct Astro CLI version was installed, run:
领英推荐
astro version
MANDATORY STEP AFTER INSTALLING ASTRO CLI:
You will need to create an empty directory where you wish to work on the project.
Navigate to your newly created empty directory through your terminal using below command.
cd path/to/your/directory
Once the Astro CLI is installed, configuring an entire Airflow project(in the newly created directory) takes only one command:
astro dev init
This will configure all the resources needed for an Airflow project in your current working directory. Your current working directory will then look something like this:
.
├── dags/
├── include/
├── plugins/
├── tests/
├── airflow_settings.yaml
├── Dockerfile
├── packages.txt
└── requirements.txt
Once the project has been created, to start the project, run:
astro dev start
If you stumble across an error like this:
Error: error building, (re)creating or starting project containers: Error response from daemon: Ports are not available: exposing port TCP 127.0.0.1:5432 -> 0.0.0.0:0: listen tcp 127.0.0.1:5432: bind: address already in use
Don't Panic, There is a solution for everything!. Follow the Steps below:
lsof -i :5432
2. Stop the conflicting process:
kill <PID> (Don't include the angle brackets)
Now you can continue with "astro dev start" in your directory terminal.
After about a minute, you can open the Airflow UI in your browser, at the address https://localhost:8080/.
Now, you’re ready to write your first DAG!
Writing Your First Airflow DAG
Now that Airflow has been installed, you’re ready to write your first DAG. First, create a file called sample_dag.py in the dags/ directory of the Airflow project you just created. Using your favourite text editor or IDE(Personally i prefer VSCode), open the sample_dag.py file. First, let’s instantiate the DAG.
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="weather_etl",
start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
schedule="@daily",
catchup=True,
max_active_runs=1,
render_template_as_native_obj=True
) as dag:
...
Above, we use the DAG function from the airflow module to define a DAG in conjunction with the with context-manager.
A start_date, schedule interval, and value for catchup are provided. This DAG will run each day at 9:00 AM UTC. Since catchup is set to True, this DAG will run for each day between the day it is first triggered and January 1, 2024, and max_active_runs=1 ensures that only one DAG can run at a time.
Now, let’s add a few tasks! First, we’ll create a task to mock extracting data from an API. Check out the code below:
...
# Import the PythonOperator
from airflow.operators.python import PythonOperator
...
def extract_data_callable():
# Print message, return a response
print("Extracting data from an weather API")
return {
"date": "2023-01-01",
"location": "NYC",
"weather": {
"temp": 33,
"conditions": "Light snow and wind"
}
}
extract_data = PythonOperator(
dag=dag,
task_id="extract_data",
python_callable=extract_data_callable
)
Next, we’ll want to create a task to transform the data returned by the extract_data task. This can be done with the following code. Here, we’re using an Airflow feature called XComs to retrieve data from the previous task.
Since render_templat_as_native_obj is set to True, these values are shared as Python objects rather than strings. The raw data from the extract_data task is then passed to the transform_data_callable as a keyword argument. This data is then transformed and returned, where it will be used by the load_data task in a similar manner.
...
# Import pandas
import pandas as pd
...
def transform_data_callable(raw_data):
# Transform response to a list
transformed_data = [
[
raw_data.get("date"),
raw_data.get("location"),
raw_data.get("weather").get("temp"),
raw_data.get("weather").get("conditions")
]
]
return transformed_data
transform_data = PythonOperator(
dag=dag,
task_id="transform_data",
python_callable=transform_data_callable,
op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_data') }}"}
)
def load_data_callable(transformed_data):
# Load the data to a DataFrame, set the columns
loaded_data = pd.DataFrame(transformed_data)
loaded_data.columns = [
"date",
"location",
"weather_temp",
"weather_conditions"
]
print(loaded_data)
load_data = PythonOperator(
dag=dag,
task_id="load_data",
python_callable=load_data_callable,
op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_data') }}"}
)
...
Finally, dependencies are set between tasks. The code here sets dependencies between the extract_data, transform_data, and load_data tasks to create a basic ETL DAG.
...
extract_data >> transform_data >> load_data
The final product will look like this!
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
import pandas as pd
with DAG(
dag_id="weather_etl",
start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
schedule="@daily",
catchup=True,
max_active_runs=1,
render_template_as_native_obj=True
) as dag:
def extract_data_callable():
# Print message, return a response
print("Extracting data from an weather API")
return {
"date": "2023-01-01",
"location": "NYC",
"weather": {
"temp": 33,
"conditions": "Light snow and wind"
}
}
extract_data = PythonOperator(
dag=dag,
task_id="extract_data",
python_callable=extract_data_callable
)
def transform_data_callable(raw_data):
# Transform response to a list
transformed_data = [
[
raw_data.get("date"),
raw_data.get("location"),
raw_data.get("weather").get("temp"),
raw_data.get("weather").get("conditions")
]
]
return transformed_data
transform_data = PythonOperator(
dag=dag,
task_id="transform_data",
python_callable=transform_data_callable,
op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_data') }}"}
)
def load_data_callable(transformed_data):
# Load the data to a DataFrame, set the columns
loaded_data = pd.DataFrame(transformed_data)
loaded_data.columns = [
"date",
"location",
"weather_temp",
"weather_conditions"
]
print(loaded_data)
load_data = PythonOperator(
dag=dag,
task_id="load_data",
python_callable=load_data_callable,
op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_data') }}"}
)
# Set dependencies between tasks
extract_data >> transform_data >> load_data
Your attention to detail in simplifying the installation process of Apache Airflow for Mac users is fantastic! Diving into advanced features of Airflow and how they can be used in complex data pipelines might be a great next step to deepen your knowledge. How do you envision using Airflow in your future projects or career? This could be a cool way to set you apart in the field of data engineering. Keep up the great work and keep sharing your insights!