Airflow + PostgreSQL + WSL
Gerald Gibson
Principal Engineer @ Salesforce | Hyperscale, Machine Learning, Patented Inventor
Airflow is a software service that provides asynchronous and distributed execution of workflows. There are several different "step types" that are available for these workflows such as command line scripts, Python function executions, etc. This type of service is used for many different kinds of business operations such as database ETLs, Data Engineering pipelines, Machine Learning pipelines, etc.
WSL (Windows Subsystem for Linux) is a virtual host for Linux operating systems (such as Ubuntu) running on top of Windows. For details on setting up WSL + Ubuntu on Windows check out my other article on how to here.
This article assumes you already have WSL + Ubuntu setup. The scenario here is that you want to run Airflow locally on your Windows development machine to build a solution using Airflow + Python + PostgreSQL or just to learn by getting hands on experience using these tools.
Below we will also see a short Python project that implements an Airflow DAG that reads data from a PostgreSQL database into files and then uploads those files with an Amazon S3 API call to test out the Airflow you setup.
The script below is the series of steps (mostly Linux commands) that sets up a newly created WSL Ubuntu image for the scenario described above. Even though there are many blog posts, articles, official documentation sources you can find that explain how to set this up they all failed for me because there was always some compatibility assumptions they made that did not match the exact environment (WSL + Ubuntu) or version of Airflow or PostgreSQL. After searching through many of the sources of documentation and reading several help forums I was able to assemble and test the below steps to document a repeatable process for reliably setting up this kind of environment.
# Setup Airflow WSL & sample SQL to S3 Project
# Update APT
sudo apt update && sudo apt upgrade
# If a failure occurs use the following line to try again
sudo dpkg --configure -a && sudo apt upgrade
# Update pip
sudo apt install python3-pip
# Install virtual environment package
sudo pip3 install virtualenv
# Create virtual environment
virtualenv airflow_env
# Activate virtual environment
source airflow_env/bin/activate
# Setup a home directory for Airflow
mkdir airflow_home
sudo nano ~/.bashrc
# Add following two exports to the end of the file?
# (Ctrl+O to Write Out the changes and Ctrl+X to exit)
export AIRFLOW_HOME=~/airflow_home
# Airflow installs scripts in the user/.local/bin path?
# and warns if it is not on the path
export PATH=/home/ggibson/.local/bin:$PATH
# EXIT Terminal and re-enter OR use
source ~/.bashrc
# Get python version and make sure the airflow install?
# constraint matches the python Major.Minor verison e.g. 3.10
cd $AIRFLOW_HOME
python3 --version
pip install 'apache-airflow[postgres]==2.6.3' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
# Use Sysvinit commands instead of Systemd?
# (WSL comes with Sysvinit) to start PostgreSQL
# https://linuxhandbook.com/system-has-not-been-booted-with-systemd/
# Install Postgres 15?
# (because 14 locks up on CREATE DATABASE in WSL)
sudo apt install libpq-dev
sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo tee /etc/apt/trusted.gpg.d/pgdg.asc &>/dev/null
sudo apt update && sudo apt upgrade
sudo apt install postgresql postgresql-client -y
sudo apt update && sudo apt upgrade
pip install psycopg2
sudo apt update && sudo apt upgrade
# Validate the installation worked
psql --version
# Start the PostgreSQL service
sudo service postgresql start
# Go into the PostgreSQL interactive console to?
# issue setup commands
sudo -u postgres psql
# Setup a user called 'airflow' which Airflow will?
# use to talk to the PostgreSQL metadata database?
# and you can use for creating and using additional?
# databases for your project
CREATE USER airflow PASSWORD 'airflow';
# Create the airflow metadata database and setup all permissions
CREATE DATABASE airflow;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
ALTER USER airflow SET search_path = public;
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER USER airflow CREATEDB;
\c airflow postgres
GRANT ALL ON SCHEMA public TO airflow;
exit
cd $AIRFLOW_HOME
# Temporarily start the scheduler to force it?
# to generate the airflow.cfg file
airflow scheduler
# Ctrl-C to kill airflow scheduler for now
nano airflow.cfg
# Look for the executor and sql_alchemy_conn?
# variables and set them to the values below
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
# Now it is time to initialize Airflow?
# with it's PostgreSQL metadata database
cd $AIRFLOW_HOME
airflow db init
# Make the directory to hold dag python files
mkdir dags
# Create an admin user you can use to log?
# into the Airflow web UI
airflow users create --username admin --password admin --firstname YOUR_NAME --lastname YOUR_NAME --role Admin --email YOUR_EMAIL
# Verify the user got created
airflow users list
# Now exit this WSL Airflow terminal
# Open new WSL Airflow terminal and start?
# PostgreSQL service and Airflow scheduler
source airflow_env/bin/activate && cd $AIRFLOW_HOME && sudo service postgresql start && airflow scheduler
# Open new WSL Airflow terminal and?
# start Airflow webserver
source airflow_env/bin/activate && cd $AIRFLOW_HOME && airflow webserver
# You can now open a browser in Windows and?
# go to https://localhost:8080/ to log into the Airflow admin UI
Test Airflow DAG Project
Once you have a working WSL Airflow environment as described above then you can start writing code. The easiest setup I could find for this is to use Microsoft VSCode and install into it the WSL and Python extensions provided by Microsoft. Then you can navigate to the path inside your WSL Ubuntu where you set your $AIRFLOW_HOME and created your "dags" folder. There you simply run "code ." (that is 'code' and a period) then press enter which will launch VSCode in Windows that is pointing to your WSL environment to create and edit files and reference the Python you have setup there for the Intellisense.
In this project location you create Python files in the dags folder containing Airflow code and they will automatically show up inside the Airflow web UI and will execute on the schedule you setup.
This test project uses another service called Min.io which provides an Object Store service like Amazon S3 and an S3 compatible API. This is very nice since you can develop all your code locally and not worry about setting up network connections to Amazon AWS until you are ready to deploy to production.
The below snippet shows the steps to setup Min.io and add the Amazon API plugin to Airflow.
# Setup for the test project
# Install Min.io
wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio_20230711212934.0.0_amd64.deb -O minio.deb
sudo dpkg -i minio.deb
# Start Min.io
mkdir ~/minio
minio server ~/minio --console-address :9090
# You can now open a browser in Windows and?
# go to https://localhost:9090/ to log into the Min.io admin UI
# If you cannot login to minio and you are using a VPN then turn off the VPN
# Add Amazon services to Airflow
source airflow_env/bin/activate && cd $AIRFLOW_HOME
pip install apache-airflow-providers-amazon
Now you can create a new database in the PostgreSQL instance running in your WSL environment using a DB management tool (PostgreSQL uses port 5432 by default) and import some sample data to test this code out with.
I have written file read / write code so many times in my life it has become an extremely boring ritual of looking up the exact syntax and typing the same code. This time I decided that instead of spending the time doing an internet search to get the exact syntax I would use that same amount of time (or less) asking Chat GPT to do it for me (with a few manual edits added to the code).
This "chat" I show here:
import logging
from datetime import datetime, timedelta
import csv
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
default_args = {
? ? 'owner': 'admin',
? ? 'retries': 5,
? ? 'retry_delay': timedelta(minutes=10)
}
def upload_data():
? ? # Fetch a batch of data to a file
? ? hook = PostgresHook(postgres_conn_id="postgress_test_conn")
? ? conn = hook.get_conn()
? ? cursor = conn.cursor()
? ? start_datetime = get_start_datetime()
end_datetime = ({(start_datetime + timedelta(minutes=60))
? ? cursor.execute("SELECT * FROM test_data WHERE timestamp >= TO_TIMESTAMP('" + start_datetime.strftime('%Y%m%d:%H:%M:%S') + "','YYYYMMDD:HH24:MI:SS') AND timestamp < TO_TIMESTAMP('" + end_datetime.strftime('%Y%m%d:%H:%M:%S') + "','YYYYMMDD:HH24:MI:SS')")
? ? with open(f"data_to_upload/test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv", "w") as f:
? ? ? ? csv_writer = csv.writer(f)
? ? ? ? csv_writer.writerow([i[0] for i in cursor.description])
? ? ? ? csv_writer.writerows(cursor)
? ? cursor.close()
? ? conn.close()
? ? logging.info(f"Success!!. ({start_datetime.strftime('%Y%m%d:%H:%M:%S')}) -> end_datetime.strftime('%Y%m%d:%H:%M:%S')})")
? ? # Upload file to Min.io S3 API
? ? s3_hook = S3Hook(aws_conn_id="minio")
? ? s3_hook.load_file(
? ? ? ? filename=f"data_to_upload/test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv",
? ? ? ? key=f"test_data_{start_datetime.strftime('%Y-%m-%d %H-%M-%S')}.csv",
? ? ? ? bucket_name="airflow",
? ? ? ? replace=True
? ? )
? ? # Save end date to be used as the start date for the next run
? ? save_current_transfer_datetime(end_datetime)
def write_datetime_to_file(file_path, datetime_value):
? ? with open(file_path, 'w') as file:
? ? ? ? file.write(datetime_value.isoformat())
def read_datetime_from_file(file_path):
? ? if not file_exists(file_path):
? ? ? ? return datetime(2023,1,1)? # Return start datetime
? ??
? ? with open(file_path, 'r') as file:
? ? ? ? datetime_str = file.read().strip()
? ? ? ??
? ? ? ? try:
? ? ? ? ? ? parsed_datetime = datetime.fromisoformat(datetime_str)
? ? ? ? ? ? return parsed_datetime.replace(microsecond=0)? # Return parsed datetime without microseconds
? ? ? ? except ValueError:
? ? ? ? ? ? return datetime(2023,1,1)? # Return start datetime if parsing fails
def file_exists(file_path):
? ? try:
? ? ? ? with open(file_path, 'r') as file:
? ? ? ? ? ? return True
? ? except FileNotFoundError:
? ? ? ? return False
? ??
def get_start_datetime():
? ? previous_datetime = read_datetime_from_file('cache_previous_enddate.txt')
? ? return previous_datetime
def save_current_transfer_datetime(current_datetime):
? ? write_datetime_to_file('cache_previous_enddate.txt', current_datetime)
with DAG (
dag_id="dag_test_upload_data",
default_args=default_args,
start_date=datetime(2023,1,1),
schedule_interval="*/60 * * * *" # run every 60 minutes
) as dag:
? ? task1 = PythonOperator(
? ? ? ? task_id="task_test_upload_data",
? ? ? ? python_callable=upload_data
? ? )
? ? task1
PhD Candidate in Computer Science at UFRGS | AI Researcher | Machine Learning
1 年Thank you very much for sharing this valuable knowledge. It is precisely?what I need to move forward in my project.
Global Cross-Functional Leader | Enterprise System Architect | Patented Innovator | Talent Developer | Fractional CTO
1 年You know when you go to buy that car and then you see it everywhere. Couple months back I was looking for some ETL software and starred using Airflow. Now it like everywhere. ??
Founder & CEO | Salad Technologies
1 年Love your work with WSL! The Salad team has spent a lot of time doing the same to support our container service running atop thousands of distributed machines, and it's exciting to see workflows your demo'ing that may be in our future!