Airflow + PostgreSQL + WSL

Airflow + PostgreSQL + WSL

Airflow is a software service that provides asynchronous and distributed execution of workflows. There are several different "step types" that are available for these workflows such as command line scripts, Python function executions, etc. This type of service is used for many different kinds of business operations such as database ETLs, Data Engineering pipelines, Machine Learning pipelines, etc.

WSL (Windows Subsystem for Linux) is a virtual host for Linux operating systems (such as Ubuntu) running on top of Windows. For details on setting up WSL + Ubuntu on Windows check out my other article on how to here.


This article assumes you already have WSL + Ubuntu setup. The scenario here is that you want to run Airflow locally on your Windows development machine to build a solution using Airflow + Python + PostgreSQL or just to learn by getting hands on experience using these tools.

Below we will also see a short Python project that implements an Airflow DAG that reads data from a PostgreSQL database into files and then uploads those files with an Amazon S3 API call to test out the Airflow you setup.

The script below is the series of steps (mostly Linux commands) that sets up a newly created WSL Ubuntu image for the scenario described above. Even though there are many blog posts, articles, official documentation sources you can find that explain how to set this up they all failed for me because there was always some compatibility assumptions they made that did not match the exact environment (WSL + Ubuntu) or version of Airflow or PostgreSQL. After searching through many of the sources of documentation and reading several help forums I was able to assemble and test the below steps to document a repeatable process for reliably setting up this kind of environment.

# Setup Airflow WSL & sample SQL to S3 Project


# Update APT
sudo apt update && sudo apt upgrade


# If a failure occurs use the following line to try again
sudo dpkg --configure -a && sudo apt upgrade


# Update pip
sudo apt install python3-pip


# Install virtual environment package
sudo pip3 install virtualenv


# Create virtual environment
virtualenv airflow_env


# Activate virtual environment
source airflow_env/bin/activate


# Setup a home directory for Airflow
mkdir airflow_home


sudo nano ~/.bashrc
# Add following two exports to the end of the file?
# (Ctrl+O to Write Out the changes and Ctrl+X to exit)
export AIRFLOW_HOME=~/airflow_home
# Airflow installs scripts in the user/.local/bin path?
# and warns if it is not on the path
export PATH=/home/ggibson/.local/bin:$PATH


# EXIT Terminal and re-enter OR use
source ~/.bashrc


# Get python version and make sure the airflow install?
# constraint matches the python Major.Minor verison e.g. 3.10
cd $AIRFLOW_HOME
python3 --version
pip install 'apache-airflow[postgres]==2.6.3' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"






# Use Sysvinit commands instead of Systemd?
# (WSL comes with Sysvinit) to start PostgreSQL
# https://linuxhandbook.com/system-has-not-been-booted-with-systemd/


# Install Postgres 15?
# (because 14 locks up on CREATE DATABASE in WSL)
sudo apt install libpq-dev
sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo tee /etc/apt/trusted.gpg.d/pgdg.asc &>/dev/null
sudo apt update && sudo apt upgrade
sudo apt install postgresql postgresql-client -y
sudo apt update && sudo apt upgrade
pip install psycopg2
sudo apt update && sudo apt upgrade
# Validate the installation worked
psql --version


# Start the PostgreSQL service
sudo service postgresql start
# Go into the PostgreSQL interactive console to?
# issue setup commands
sudo -u postgres psql


# Setup a user called 'airflow' which Airflow will?
# use to talk to the PostgreSQL metadata database?
# and you can use for creating and using additional?
# databases for your project
CREATE USER airflow PASSWORD 'airflow';


# Create the airflow metadata database and setup all permissions
CREATE DATABASE airflow;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
ALTER USER airflow SET search_path = public;
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER USER airflow CREATEDB;
\c airflow postgres
GRANT ALL ON SCHEMA public TO airflow;
exit
cd $AIRFLOW_HOME


# Temporarily start the scheduler to force it?
# to generate the airflow.cfg file
airflow scheduler
# Ctrl-C to kill airflow scheduler for now


nano airflow.cfg
# Look for the executor and sql_alchemy_conn?
# variables and set them to the values below
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow


# Now it is time to initialize Airflow?
# with it's PostgreSQL metadata database
cd $AIRFLOW_HOME
airflow db init


# Make the directory to hold dag python files
mkdir dags


# Create an admin user you can use to log?
# into the Airflow web UI
airflow users create --username admin --password admin --firstname YOUR_NAME --lastname YOUR_NAME --role Admin --email YOUR_EMAIL
# Verify the user got created
airflow users list


# Now exit this WSL Airflow terminal


# Open new WSL Airflow terminal and start?
# PostgreSQL service and Airflow scheduler
source airflow_env/bin/activate && cd $AIRFLOW_HOME && sudo service postgresql start && airflow scheduler


# Open new WSL Airflow terminal and?
# start Airflow webserver
source airflow_env/bin/activate && cd $AIRFLOW_HOME && airflow webserver


# You can now open a browser in Windows and?
# go to https://localhost:8080/ to log into the Airflow admin UI        


Test Airflow DAG Project

Once you have a working WSL Airflow environment as described above then you can start writing code. The easiest setup I could find for this is to use Microsoft VSCode and install into it the WSL and Python extensions provided by Microsoft. Then you can navigate to the path inside your WSL Ubuntu where you set your $AIRFLOW_HOME and created your "dags" folder. There you simply run "code ." (that is 'code' and a period) then press enter which will launch VSCode in Windows that is pointing to your WSL environment to create and edit files and reference the Python you have setup there for the Intellisense.

How to setup VSCode with WSL Ubuntu

In this project location you create Python files in the dags folder containing Airflow code and they will automatically show up inside the Airflow web UI and will execute on the schedule you setup.

This test project uses another service called Min.io which provides an Object Store service like Amazon S3 and an S3 compatible API. This is very nice since you can develop all your code locally and not worry about setting up network connections to Amazon AWS until you are ready to deploy to production.

The below snippet shows the steps to setup Min.io and add the Amazon API plugin to Airflow.

# Setup for the test project


# Install Min.io


wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio_20230711212934.0.0_amd64.deb -O minio.deb
sudo dpkg -i minio.deb


# Start Min.io


mkdir ~/minio
minio server ~/minio --console-address :9090


# You can now open a browser in Windows and?
# go to https://localhost:9090/ to log into the Min.io admin UI
# If you cannot login to minio and you are using a VPN then turn off the VPN


# Add Amazon services to Airflow
source airflow_env/bin/activate && cd $AIRFLOW_HOME
pip install apache-airflow-providers-amazon        

Now you can create a new database in the PostgreSQL instance running in your WSL environment using a DB management tool (PostgreSQL uses port 5432 by default) and import some sample data to test this code out with.

I have written file read / write code so many times in my life it has become an extremely boring ritual of looking up the exact syntax and typing the same code. This time I decided that instead of spending the time doing an internet search to get the exact syntax I would use that same amount of time (or less) asking Chat GPT to do it for me (with a few manual edits added to the code).

This "chat" I show here:

No alt text provided for this image
No alt text provided for this image


import logging
from datetime import datetime, timedelta
import csv
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook




default_args = {
? ? 'owner': 'admin',
? ? 'retries': 5,
? ? 'retry_delay': timedelta(minutes=10)
}


def upload_data():
? ? # Fetch a batch of data to a file
? ? hook = PostgresHook(postgres_conn_id="postgress_test_conn")
? ? conn = hook.get_conn()
? ? cursor = conn.cursor()
? ? start_datetime = get_start_datetime()
	end_datetime = ({(start_datetime + timedelta(minutes=60))
? ? cursor.execute("SELECT * FROM test_data WHERE timestamp >= TO_TIMESTAMP('" + start_datetime.strftime('%Y%m%d:%H:%M:%S') + "','YYYYMMDD:HH24:MI:SS') AND timestamp < TO_TIMESTAMP('" + end_datetime.strftime('%Y%m%d:%H:%M:%S') + "','YYYYMMDD:HH24:MI:SS')")
? ? with open(f"data_to_upload/test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv", "w") as f:
? ? ? ? csv_writer = csv.writer(f)
? ? ? ? csv_writer.writerow([i[0] for i in cursor.description])
? ? ? ? csv_writer.writerows(cursor)
? ? cursor.close()
? ? conn.close()
? ? logging.info(f"Success!!. ({start_datetime.strftime('%Y%m%d:%H:%M:%S')}) -> end_datetime.strftime('%Y%m%d:%H:%M:%S')})")
? ? # Upload file to Min.io S3 API
? ? s3_hook = S3Hook(aws_conn_id="minio")
? ? s3_hook.load_file(
? ? ? ? filename=f"data_to_upload/test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv",
? ? ? ? key=f"test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv",
? ? ? ? bucket_name="airflow",
? ? ? ? replace=True
? ? )
? ? # Save end date to be used as the start date for the next run
? ? save_current_transfer_datetime(end_datetime)


def write_datetime_to_file(file_path, datetime_value):
? ? with open(file_path, 'w') as file:
? ? ? ? file.write(datetime_value.isoformat())


def read_datetime_from_file(file_path):
? ? if not file_exists(file_path):
? ? ? ? return datetime(2023,1,1)? # Return start datetime
? ??
? ? with open(file_path, 'r') as file:
? ? ? ? datetime_str = file.read().strip()
? ? ? ??
? ? ? ? try:
? ? ? ? ? ? parsed_datetime = datetime.fromisoformat(datetime_str)
? ? ? ? ? ? return parsed_datetime.replace(microsecond=0)? # Return parsed datetime without microseconds
? ? ? ? except ValueError:
? ? ? ? ? ? return datetime(2023,1,1)? # Return start datetime if parsing fails


def file_exists(file_path):
? ? try:
? ? ? ? with open(file_path, 'r') as file:
? ? ? ? ? ? return True
? ? except FileNotFoundError:
? ? ? ? return False
? ??
def get_start_datetime():
? ? previous_datetime = read_datetime_from_file('cache_previous_enddate.txt')
? ? return previous_datetime


def save_current_transfer_datetime(current_datetime):
? ? write_datetime_to_file('cache_previous_enddate.txt', current_datetime)


with DAG (
    dag_id="dag_test_upload_data",
    default_args=default_args,
    start_date=datetime(2023,1,1),
    schedule_interval="*/60 * * * *" # run every 60 minutes
) as dag:
? ? task1 = PythonOperator(
? ? ? ? task_id="task_test_upload_data",
? ? ? ? python_callable=upload_data
? ? )
? ? task1        
Jaqueline Bitencourt

PhD Candidate in Computer Science at UFRGS | AI Researcher | Machine Learning

1 年

Thank you very much for sharing this valuable knowledge. It is precisely?what I need to move forward in my project.

回复
Randy Forbes

Global Cross-Functional Leader | Enterprise System Architect | Patented Innovator | Talent Developer | Fractional CTO

1 年

You know when you go to buy that car and then you see it everywhere. Couple months back I was looking for some ETL software and starred using Airflow. Now it like everywhere. ??

Bob Miles ??

Founder & CEO | Salad Technologies

1 年

Love your work with WSL! The Salad team has spent a lot of time doing the same to support our container service running atop thousands of distributed machines, and it's exciting to see workflows your demo'ing that may be in our future!

要查看或添加评论,请登录

Gerald Gibson的更多文章

  • Chat That App Intro

    Chat That App Intro

    Chat That App is a Python desktop app I created by using ChatGPT from OpenAI to generate classes, functions, etc. that…

  • ChatGPT + Timeseries Anomalies

    ChatGPT + Timeseries Anomalies

    Over the past five years, I have been transforming my career from software engineering to machine learning engineering.…

    2 条评论
  • TensorFlow-GPU + Ubuntu + WSL

    TensorFlow-GPU + Ubuntu + WSL

    This article walks you through the steps I discovered recently for setting up a working environment to create…

    4 条评论
  • Probabilistic Data Separation

    Probabilistic Data Separation

    Clusters, modes, distributions, categories, sub-populations, sub-signals, mixtures, proportions, ratios, density curve.…

  • Regional and Online Learnable Fields

    Regional and Online Learnable Fields

    Regional and Online Learnable Fields is a type of data clustering algorithm invented in the early 2000's. It was…

    1 条评论
  • Designing an architecture for MLOps

    Designing an architecture for MLOps

    A large part of architecting anything complex (think software, large buildings, aircraft, etc.) is the skill of mental…

  • Splunk & Datacamp Training

    Splunk & Datacamp Training

    Not a real article. Just a place to host these since the one drive sharing option is not working.

  • Random, Stochastic, Probabilistic

    Random, Stochastic, Probabilistic

    At the end of the previous article it was mentioned that we would show how, from a computer programming perspective…

  • Bayesian probabilities visualized 2

    Bayesian probabilities visualized 2

    In the previous article we covered the basics about what some of these words / phrases used in the Bayesian world…

  • Bayesian probabilities visualized

    Bayesian probabilities visualized

    I once saw an interview of Benoit Mandelbrot in which he described as a child in his math studies he saw shapes in his…