Airflow Basics: Installation & Guide
Credits: Datacamp: Getting Started with Apache Airflow

Airflow Basics: Installation & Guide


What is Apache Airflow?

Apache Airflow, or Airflow, is an open-source tool and framework for running your data pipelines in production. As an industry-leading data workflow management tool, Apache Airflow leverages Python to allow data practitioners to define their data pipelines as code. Airflow adds the ability to schedule pipeline execution and observe performance, making it a centralized hub for all of your data workflows. Whether you’re preparing training data for a model, or persisting data in a data lake, Airflow adds the functionality to make your data pipelines production-ready.

Airflow was first created at Airbnb by Maxime Beauchemin in 2014 and later joined the Apache Software Foundation’s Incubator program in March 2016, before being announced as a top-level project in 2019. According to Airflow’s 2022 survey, Airflow is downloaded millions of times per month, and thousands of companies, both large and small, rely on the tool.

Key Features of Airflow

Airflow’s framework, as well as its architecture, have several key features that make it unique. First, let’s dig a little deeper into the most important features of Airflow’s framework.

Features of Airflow’s Framework

The simplest unit of the Airflow framework are tasks. Tasks can be thought of as operations or, for most data teams, operations in a data pipeline.

A traditional ETL workflow has three tasks; extracting, transforming, and loading data. Dependencies define the relationships between tasks. Going back to our ETL example, the “load” task depends on the “transform” task, which, in turn, depends on the “extract” task. The combination of tasks and dependencies create DAGs, or directed-acyclic graphs. DAGs represent data pipelines in Airflow, and are a little convoluted to define. Instead, let’s take a look at a diagram of a basic ETL pipeline:

The DAG above has three tasks, with two dependencies. It’s considered a DAG because there are no loops (or cycles) between tasks. Here, the arrows show the directed nature of the process; first, the extract task is run, followed by the transform and load tasks. With DAGs, it’s easy to see a distinct start and end to the process, even if the logic is complex, like the DAG shown below:

In this DAG, the logic is a bit crazier. There is branching based on a condition, and a few tasks are run in parallel. However, the graph is directed, and there are no cyclic dependencies between tasks. Now, let’s take a look at a process that is not a DAG:

In this diagram, there is a distinct loop between the transform and validate tasks. In some cases, this DAG may run forever, if there is no way to break out of this loop.

When building data pipelines, even outside of Airflow, it’s best practice to stay away from creating workflows that can’t be represented as DAGs, as you may lose key features, such as determinism or idempotency.

Features of Airflow’s Architecture

To schedule DAGs, execute tasks, and provide visibility into data pipeline execution details, Airflow leverages a Python-based architecture made up of the components below:

  • Scheduler
  • Executor
  • Metadata database
  • Webserver (UI)

Whether running Airflow locally or in a production environment, each of these components must be up and running for Airflow to function properly.

The scheduler is responsible for (you probably guessed it) scheduling DAGs. To schedule a DAG, a start date and schedule interval for the DAG must be provided when the DAG is written as Python code.

Once a DAG is scheduled, tasks within those DAGs need to be executed, which is where the executor comes in. The executor does not run the logic within each task; it just allocates the task to be run by whatever resources are configured to do so. The metadata database stores information about DAG runs, such as whether or not the DAG and its associated tasks ran successfully.

The metadata database also stores information such as user-defined variables and connections, which help when building production-grade data pipelines. Finally, the web server provides the user interface with Airflow.

This user interface, or UI, provides data teams with a central tool to manage their pipeline execution. In the Airflow UI, data teams can view the state of their DAGs, manually re-run DAG, store variables and connections, and so much more. The Airflow UI provides central visibility into data ingestion and delivery processes, helping to keep data teams informed and aware of their data pipeline performance.

Installing Apache Airflow

There are a number of ways to install Apache Airflow. We’ll cover one of the best and easiest way which worked for me, Installing Airflow with Astro CLI(Command Line Interface)

Installing Airflow with the Astro CLI

Astronomer, a managed Airflow provider, provides a number of free tools to help make working with Airflow easier. One of these tools is the Astro CLI.

The Astro CLI makes it easy to create and manage everything you need to run Airflow. To get started, you’ll first have to install the CL(command line).

Prerequisites

By default, the Astro CLI uses Docker as its container management engine for running Airflow locally.

Installation

To install the latest version of the Astro CLI, run the following command in your terminal:

brew install astro
        

To install a specific version of the Astro CLI, specify the version you want to install at the end of the command. For example, to install Astro CLI version 1.25.0, you would run the following command: (not really required, you install the latest version with the command above)

brew install [email protected]
        

If you specify only a major version, this command installs the latest minor or patch version available for the major version. For a list of all available versions, see the CLI release notes.

Confirmation

To verify that the correct Astro CLI version was installed, run:

astro version        


MANDATORY STEP AFTER INSTALLING ASTRO CLI:

You will need to create an empty directory where you wish to work on the project.

Navigate to your newly created empty directory through your terminal using below command.

cd path/to/your/directory        

Once the Astro CLI is installed, configuring an entire Airflow project(in the newly created directory) takes only one command:

astro dev init        

This will configure all the resources needed for an Airflow project in your current working directory. Your current working directory will then look something like this:

.
├── dags/
├── include/
├── plugins/
├── tests/
├── airflow_settings.yaml
├── Dockerfile
├── packages.txt
└── requirements.txt
        

Once the project has been created, to start the project, run:

astro dev start         

If you stumble across an error like this:

Error: error building, (re)creating or starting project containers: Error response from daemon: Ports are not available: exposing port TCP 127.0.0.1:5432 -> 0.0.0.0:0: listen tcp 127.0.0.1:5432: bind: address already in use        

Don't Panic, There is a solution for everything!. Follow the Steps below:

  1. Identify the conflicting process:

  • Open a new terminal and run the command below to see a list of processes using ports:

lsof -i :5432         

  • This will list the process ID (PID) and details of the process using port 5432.

2. Stop the conflicting process:

  • Once the process is identified (e.g., a database server), stop it gracefully.
  • Use the PID obtained from lsof with the following command:

kill <PID>  (Don't include the angle brackets)        

Now you can continue with "astro dev start" in your directory terminal.

After about a minute, you can open the Airflow UI in your browser, at the address https://localhost:8080/.

Now, you’re ready to write your first DAG!

Writing Your First Airflow DAG

Now that Airflow has been installed, you’re ready to write your first DAG. First, create a file called sample_dag.py in the dags/ directory of the Airflow project you just created. Using your favourite text editor or IDE(Personally i prefer VSCode), open the sample_dag.py file. First, let’s instantiate the DAG.

from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="weather_etl",
    start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    render_template_as_native_obj=True
) as dag:
...        

Above, we use the DAG function from the airflow module to define a DAG in conjunction with the with context-manager.

A start_date, schedule interval, and value for catchup are provided. This DAG will run each day at 9:00 AM UTC. Since catchup is set to True, this DAG will run for each day between the day it is first triggered and January 1, 2024, and max_active_runs=1 ensures that only one DAG can run at a time.

Now, let’s add a few tasks! First, we’ll create a task to mock extracting data from an API. Check out the code below:

...

# Import the PythonOperator
from airflow.operators.python import PythonOperator

...

    def extract_data_callable():
        # Print message, return a response
        print("Extracting data from an weather API")
        return {
            "date": "2023-01-01",
            "location": "NYC",
            "weather": {
                "temp": 33,
                "conditions": "Light snow and wind"
            }
        }


    extract_data = PythonOperator(
        dag=dag,
        task_id="extract_data",
        python_callable=extract_data_callable
    )

        

Next, we’ll want to create a task to transform the data returned by the extract_data task. This can be done with the following code. Here, we’re using an Airflow feature called XComs to retrieve data from the previous task.

Since render_templat_as_native_obj is set to True, these values are shared as Python objects rather than strings. The raw data from the extract_data task is then passed to the transform_data_callable as a keyword argument. This data is then transformed and returned, where it will be used by the load_data task in a similar manner.

 ...

# Import pandas
import pandas as pd

...

    def transform_data_callable(raw_data):
        # Transform response to a list
        transformed_data = [
            [
                raw_data.get("date"),
                raw_data.get("location"),
                raw_data.get("weather").get("temp"),
                raw_data.get("weather").get("conditions")
            ]
        ]
        return transformed_data


    transform_data = PythonOperator(
        dag=dag,
        task_id="transform_data",
        python_callable=transform_data_callable,
        op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_data') }}"}
    )

    def load_data_callable(transformed_data):
        # Load the data to a DataFrame, set the columns
        loaded_data = pd.DataFrame(transformed_data)
        loaded_data.columns = [
            "date",
            "location",
            "weather_temp",
            "weather_conditions"
        ]
        print(loaded_data)


    load_data = PythonOperator(
        dag=dag,
        task_id="load_data",
        python_callable=load_data_callable,
        op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_data') }}"}
    )

...        

Finally, dependencies are set between tasks. The code here sets dependencies between the extract_data, transform_data, and load_data tasks to create a basic ETL DAG.

...

extract_data >> transform_data >> load_data        

The final product will look like this!

from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
import pandas as pd

with DAG(
    dag_id="weather_etl",
    start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    render_template_as_native_obj=True
) as dag:
    def extract_data_callable():
        # Print message, return a response
        print("Extracting data from an weather API")
        return {
            "date": "2023-01-01",
            "location": "NYC",
            "weather": {
                "temp": 33,
                "conditions": "Light snow and wind"
            }
        }


    extract_data = PythonOperator(
        dag=dag,
        task_id="extract_data",
        python_callable=extract_data_callable
    )


    def transform_data_callable(raw_data):
        # Transform response to a list
        transformed_data = [
            [
                raw_data.get("date"),
                raw_data.get("location"),
                raw_data.get("weather").get("temp"),
                raw_data.get("weather").get("conditions")
            ]
        ]
        return transformed_data


    transform_data = PythonOperator(
        dag=dag,
        task_id="transform_data",
        python_callable=transform_data_callable,
        op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_data') }}"}
    )


    def load_data_callable(transformed_data):
        # Load the data to a DataFrame, set the columns
        loaded_data = pd.DataFrame(transformed_data)
        loaded_data.columns = [
            "date",
            "location",
            "weather_temp",
            "weather_conditions"
        ]
        print(loaded_data)


    load_data = PythonOperator(
        dag=dag,
        task_id="load_data",
        python_callable=load_data_callable,
        op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_data') }}"}
    )

    # Set dependencies between tasks
    extract_data >> transform_data >> load_data        


Your attention to detail in simplifying the installation process of Apache Airflow for Mac users is fantastic! Diving into advanced features of Airflow and how they can be used in complex data pipelines might be a great next step to deepen your knowledge. How do you envision using Airflow in your future projects or career? This could be a cool way to set you apart in the field of data engineering. Keep up the great work and keep sharing your insights!

要查看或添加评论,请登录

Divine Sam的更多文章

  • Understanding Change Data Capture(CDC) in Delta Lake

    Understanding Change Data Capture(CDC) in Delta Lake

    Introduction In the world of data, keeping information up-to-date across multiple systems can be challenging. This is…

    2 条评论
  • Steps Involved in Designing a Data Warehouse

    Steps Involved in Designing a Data Warehouse

    Designing a data warehouse is a crucial task for any organization looking to leverage its data for business…

  • Data Build Tool

    Data Build Tool

    The Evolution of Data Warehousing: ETL vs. ELT In the early days of data warehousing, storage costs were a major…

社区洞察

其他会员也浏览了