Creating an ETL Data Pipeline Using Airflow With DockerOperator
My ETL data pipeline that ranks MSAs by "invest-ability"

Creating an ETL Data Pipeline Using Airflow With DockerOperator

What are the best MSAs (Metropolitan Statistical Areas) to invest in real estate right now? Is it Austin, TX? Tampa, FL? Detroit, MI? The ETL pipeline described in this article is the data engineering needed to answer this question.

My name is Austin Wolff, and I'm a Real Estate Data Scientist that analyzes the best markets and locations for real estate investment. My analyses have previously been read by multiple billion-dollar firms.

New York buildings
New York, NY

In this article I'm going to share the technical data engineering behind the ETL pipeline that makes this possible.

First, I will explain the extraction (E) and transformation (T) of the data before I dive into using Airflow and Docker to orchestrate the pipeline.

Part 1: Extraction

The data used for analysis comes from two different sources/APIs: The Bureau of Labor Statics (BLS) and the ACS Survey from the US Census. The BLS provides the job data I need as recent as 2 prior months (for example, in March, the most recent data will be that of January, and in April, the most recent data will be February).

The BLS data is accessed and downloaded using my download_bls_data.py script (a snippet of the Python code is featured below).

...
# Get the data from the API
headers = {'Content-type': 'application/json'}
data = json.dumps({"seriesid": series_ids,
                  "startyear": str(start_year), 
                  "endyear": str(end_year),
                  "registrationkey" : bls_key,
                  "calculations": True})
p = requests.post('https://api.bls.gov/publicAPI/v2/timeseries/data/', 
                    data=data, headers=headers)
json_data = json.loads(p.text)
...        

The Census provides the demographic data I need (rent/income/price) once a year near the end of the year. Unfortunately, this data will always be a year behind or more. The Census data is accessed through their API and downloaded through a similar script of mine.

Part 2: Transformation

Finally, the data is passed through my ranking algorithm, which essentially applies this equation to each MSA:

Demographic weighting algorithm, which multiplies a weight by the demographic's normalized trend coefficient.
Demographic weight summation algorithm

Where "weight" is the weight you want to apply to a particular demographic (so, for example, you can give "job growth" more importance than "price growth" if that fit your personal investment thesis), and "z" is the normalized trend coefficient for a given dataset. (I could further explain the explicit details behind this variable, but this is not the article for that. For now, think of "z" as a measure of the trend line of a given demographic: An MSA with a higher trend of job growth will have a higher z-number.)

Now I can simply sort all MSAs by their total weight from highest to lowest, giving me an ordered list of MSAs by "invest-ability."

Running Airflow Using Docker (With Docker Operators)

Now that I had my Extract and Transformation scripts (more on the Load script in a minute), it was time to build the first DAG (Directed Acyclic Graph) using Airflow.

I used a modified, light-weight version of the docker-compose.yaml provided by Apache that works with my resource-limited MacOS (running Docker on MacOS can be very resource-intensive if you have 8GB of memory or less, since Docker needs at least 4GB to run the full Airflow Docker Image).

I needed to add this volume to the docker-compose.yaml file in order to run Docker Operators inside of a Docker container:

version: '3'
x-airflow-common:
...
  volumes:
  ...
  - /var/run/docker.sock:/var/run/docker.sock         

And then to get this to work on MacOS, I further needed to add the following TCP wrapper under "services" or else I would get a Permission Denied Error:

services:
  docker-proxy:
      image: bobrik/socat
      command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock"
      ports:
        - "2376:2375"
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock        

Now with the docker-compose.yaml file ready, I also created separate Docker Images for the download and ranking scripts, since they used different dependencies:

docker build -t download_msa_image .        

With the Airflow Image and individual task Images built, it was time to code the DAG.

Coding The ETL Pipeline With An Airflow DAG

Like all great DAGs, mine was simple:

Download BLS Data >> Download Census Data >> Rank MSAs
Download BLS Data >> Download Census Data >> Rank MSAs

The DAG should be self-explanatory: download the BLS job data, then download the Census demographic data, then rank the MSAs based on the weighting algorithm I described above. The actual code in the Docker Images is private, but I can share the code for the DAG below:

""
This dag downloads and ranks the MSAs by
the data provided by the BLS (job growth)
and the Census (median rent, income, and 
unit value).
"""
# Import necessary libraries
import os
from airflow import DAG, Dataset
from airflow.decorators import task, dag
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from datetime import datetime


# Define docker images to use
download_image = "download_msa_image:latest"
rank_image = "rank_msa_image:latest"


# Create the Dataset objects
rank_all_demos = Dataset("datasets/ranked_msa_datasets/rank_all_demos.csv")


# Define keyword arguments to use for all
# DockerOperator tasks
dockerops_kwargs = {
    'docker_url':"tcp://docker-proxy:2375",
    'network_mode':"bridge",
    'auto_remove':True,
    'xcom_all':True,
    'retrieve_output':True,
    'mount_tmp_dir':False,
    'mounts':[Mount(
                source=f"{os.environ['YOUR_LOCAL_REPO']}/includes/datasets/",
                target="/datasets/",
                type="bind"
                )],
    'retries':1
}


# Create DAG
@dag(start_date=datetime(2023, 1, 1), schedule="@monthly", catchup=False)
def download_and_rank_msa():


    ### CREATE TASKS


    # Task 1: Download BLS data
    download_bls = DockerOperator(
        task_id="download_bls",
        container_name="task__download_bls",
        image=download_image,
        environment={
            'BLS_KEY': os.environ['BLS_KEY'],
        },
        command="python3 download_bls_data.py",
        **dockerops_kwargs
    )


    # Task 2: Download census data
    download_census_msa = DockerOperator(
        task_id="download_census_msa",
        container_name="task__download_census_msa",
        image=download_image,
        environment={
            'CENSUS_KEY': os.environ['CENSUS_KEY'],
        },
        command="python3 download_census_msa_data.py",
        **dockerops_kwargs
    )


    # Task 3: Rank joined_data and save outputs to datasets/ranked_msa_datasets
    rank_msa = DockerOperator(
        task_id="rank_msa",
        container_name="task__rank_msa",
        image=rank_image,
        command="python3 rank_msa.py",
        outlets=[rank_all_demos],
        **dockerops_kwargs
    )


    download_bls >> download_census_msa >> rank_msa
    
dag = download_and_rank_msa()"        

As you can see on Task 3 (with task_id="rank_msa"), I've made the outlet equal to the Dataset "rank_all_demos" (defined before the DAG). Once this file is updated, the second DAG will begin, which can be seen as the "L" (Load) part of the ETL pipeline.

Part 3: Load

Now that the dataset is ranked, you can load any part of this data wherever you'd like (Snowflake, AWS RDS, S3, etc). For now, I simply wanted to create graph visualizations of market trends and save them to my local repo (the cheapest option, but by no means the only option).

Note: I chose to load this data into a second one-task DAG that creates and saves the graphs. This task could've been added to the first DAG, but I separated them to make them more functional. Plus, if in the future I wanted to output the graphs to S3 or any other data lake, I could simply make a separate DAG that would also be connected to the first DAG via the Dataset trigger.

Here is the code for my one-task graph-creation DAG:

"""
This dag will graph the top 10 MSAs and save the output
locally. It will be triggered when the 
"ranked_msa_datasets" are updated.
"""
# Import necessary libraries
import os
from airflow import DAG, Dataset
from airflow.decorators import task, dag
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from datetime import datetime


# Define docker images to use
graph_image = "graph_data_image:latest"


# Define keyword arguments to use for all
# DockerOperator tasks
dockerops_kwargs = {
    'docker_url':"tcp://docker-proxy:2375",
    'network_mode':"bridge",
    'auto_remove':True,
    'xcom_all':True,
    'retrieve_output':True,
    'mount_tmp_dir':False,
    'mounts':[Mount(
                source=f"{os.environ['YOUR_LOCAL_REPO']}/includes/datasets/",
                target="/datasets/",
                type="bind"
                ), 
             Mount(
                source=f"{os.environ['YOUR_LOCAL_REPO']}/graphs/",
                target="/graphs/",
                type="bind"
                )],
    'retries':0
}


# Create the Dataset object
rank_all_demos = Dataset("datasets/ranked_msa_datasets/rank_all_demos.csv")


# Create DAG -- Triggered when the Dataset is updated
@dag(start_date=datetime(2023, 1, 1), 
    schedule=[rank_all_demos], 
    catchup=False)
def graph_msa():


    # Graph top 10 MSAs, all demographics, and
    # save locally
    graph_each_demo = DockerOperator(
        task_id="graph_each_demo",
        container_name="task__graph_each_demo",
        image=graph_image,
        command="python3 graph_data.py",
        **dockerops_kwargs
    )


    graph_each_demo
    
dag = graph_msa()        

"Airflow Using Docker" Conclusion

The first "download_and_rank_msa" DAG is scheduled to run monthly, as this is when we can expect to receive job growth updates from the BLS. Seeing as one wouldn't want their Docker container running permanently for a once-a-month DAG run, future iterations of this project may include porting this project to an AWS EC2 instance and using AWS Instance Scheduler.

MSA Ranking Conclusion

As of January 2023, the Dallas-Fort Worth, TX MSA was #1 across multiple weight variations. Other top 5 MSAs included:

Charlotte, NC

Atlanta, GA

Nashville, TN

Charleston, SC

If you'd like to see the full MSA analysis, check out my real estate articles on BiggerPockets.

Final Conclusion

Thank you for reading. If you're a recruiter and would like to see the private repo which includes all code for this ETL pipeline, don't hesitate to reach out.

Shawn Fergus

World's Tallest Vice President of Marketing | More than 20 Years of Demonstrated Growth and Success Driving Pipeline, Revenue, and Brand | Master's in International Business

1 年

????

  • 该图片无替代文字
回复

要查看或添加评论,请登录

Austin Wolff的更多文章

社区洞察

其他会员也浏览了