Automating Weather Data Processing with Airflow, Docker, and Python
Here's another "Mad Scientist" Fidel Vetino latest project; on this project I'll demonstrate how to automate the process of fetching, processing, and storing weather data using Apache Airflow, Docker, and Python. Through a series of steps, we'll set up a Docker environment, define an Airflow DAG, write Python scripts to interact with weather APIs, and store the data in a PostgreSQL database. From my framework, users can implement a robust data pipeline for handling weather data efficiently. Let's get started:
Step 1: Set Up the Docker Environment
1.1 Create docker-compose.yml
Create a file named docker-compose.yml in the root directory of your project with the following content:
yaml
version: '3.7'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.4.3
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./requirements.txt:/requirements.txt
ports:
- "8080:8080"
command: >
bash -c "airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected] &&
airflow webserver"
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ] && ps -p $(cat /usr/local/airflow/airflow-webserver.pid)"]
interval: 30s
retries: 3
start_period: 30s
timeout: 30s
scheduler:
image: apache/airflow:2.4.3
depends_on:
- webserver
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./requirements.txt:/requirements.txt
command: airflow scheduler
volumes:
postgres_data:
1.2 Create requirements.txt
Create a file named requirements.txt in the root directory of your project with the following content:
txt
apache-airflow==2.4.3
psycopg2-binary
requests
pandas
Step 2: Define the Airflow DAG
Create a folder named dags in the root directory of your project. Inside the dags folder, create a Python script named weather_dag.py.
2.1 Create weather_dag.py
python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'weather_pipeline',
default_args=default_args,
description='A simple weather data pipeline',
schedule_interval=timedelta(days=1),
)
fetch_weather = PythonOperator(
task_id='fetch_weather',
python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
dag=dag,
)
process_data = PythonOperator(
task_id='process_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
dag=dag,
)
store_data = PythonOperator(
task_id='store_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
dag=dag,
)
fetch_weather >> process_data >> store_data
Step 3: Write Python Scripts
Create a folder named scripts in the root directory of your project. Inside the scripts folder, create three Python scripts: fetch_weather.py | process_data.py, and store_data.py.
3.1 Create fetch_weather.py
python
import requests
import json
def fetch_weather_data():
api_key = 'your_api_key'
city = 'London'
url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}'
response = requests.get(url)
data = response.json()
with open('/opt/airflow/scripts/weather_data.json', 'w') as f:
json.dump(data, f)
if __name__ == "__main__":
fetch_weather_data()
Make sure to replace 'your_api_key' with your actual API key from OpenWeatherMap.
3.2 Create process_data.py
python
import json
import pandas as pd
def process_weather_data():
with open('/opt/airflow/scripts/weather_data.json', 'r') as f:
data = json.load(f)
processed_data = {
'city': data['name'],
'temperature': data['main']['temp'],
'humidity': data['main']['humidity'],
'weather': data['weather'][0]['description'],
}
df = pd.DataFrame([processed_data])
df.to_csv('/opt/airflow/scripts/processed_weather_data.csv', index=False)
if __name__ == "__main__":
process_weather_data()
3.3 Create store_data.py
python
import pandas as pd
from sqlalchemy import create_engine
def store_weather_data():
df = pd.read_csv('/opt/airflow/scripts/processed_weather_data.csv')
engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres/airflow')
df.to_sql('weather', engine, if_exists='append', index=False)
if __name__ == "__main__":
store_weather_data()
Step 4: Run the Project
4.1 Install Docker
Ensure Docker is installed and running on your local machine. You can download and install Docker from the official Docker website.
4.2 Build and Run Docker Containers
Run the following command in the root directory of your project to build and start the Docker containers:
sh
docker-compose up --build
4.3 Access Airflow UI
After the containers are up and running, open your browser and go to:
arduino
https://localhost:8080
4.4 Enable the DAG
In the Airflow UI, you will see the weather_pipeline DAG. Toggle the switch next to the DAG name to enable it.
4.5 Trigger the DAG
To manually trigger the DAG:
Step 5: Verify Data in PostgreSQL
Once the DAG has run, you will need to verify that the data has been stored in the PostgreSQL database.
领英推荐
5.1 Connect to PostgreSQL
You can connect to PostgreSQL using the psql command-line tool or a graphical tool like pgAdmin. Here’s how to connect using psql from your terminal:
sh
docker exec -it <container_id_of_postgres> psql -U airflow -d airflow
You can find the container ID by running:
sh
docker ps
Look for the container running the postgres service.
5.2 Query the Data
Once connected to the PostgreSQL database, run the following SQL query to check the stored weather data:
sql
SELECT * FROM weather;
This query will display all the rows in the weather table, showing the weather data fetched and processed by the Airflow pipeline.
Additional Details
Complete Docker Setup
??? docker-compose.yml
yaml
version: '3.7'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.4.3
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./requirements.txt:/requirements.txt
ports:
- "8080:8080"
command: >
bash -c "airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected] &&
airflow webserver"
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ] && ps -p $(cat /usr/local/airflow/airflow-webserver.pid)"]
interval: 30s
retries: 3
start_period: 30s
timeout: 30s
scheduler:
image: apache/airflow:2.4.3
depends_on:
- webserver
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./requirements.txt:/requirements.txt
command: airflow scheduler
volumes:
postgres_data:
??? requirements.txt
txt
apache-airflow==2.4.3
psycopg2-binary
requests
pandas
Complete Airflow DAG
??? dags/weather_dag.py
python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'weather_pipeline',
default_args=default_args,
description='A simple weather data pipeline',
schedule_interval=timedelta(days=1),
)
fetch_weather = PythonOperator(
task_id='fetch_weather',
python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
dag=dag,
)
process_data = PythonOperator(
task_id='process_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
dag=dag,
)
store_data = PythonOperator(
task_id='store_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
dag=dag,
)
fetch_weather >> process_data >> store_data
Complete Python Scripts
???scripts/fetch_weather.py
python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'weather_pipeline',
default_args=default_args,
description='A simple weather data pipeline',
schedule_interval=timedelta(days=1),
)
fetch_weather = PythonOperator(
task_id='fetch_weather',
python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
dag=dag,
)
process_data = PythonOperator(
task_id='process_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
dag=dag,
)
store_data = PythonOperator(
task_id='store_data',
python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
dag=dag,
)
fetch_weather >> process_data >> store_data
???scripts/process_data.py
python
import json
import pandas as pd
def process_weather_data():
with open('/opt/airflow/scripts/weather_data.json', 'r') as f:
data = json.load(f)
processed_data = {
'city': data['name'],
'temperature': data['main']['temp'],
'humidity': data['main']['humidity'],
'weather': data['weather'][0]['description'],
}
df = pd.DataFrame([processed_data])
df.to_csv('/opt/airflow/scripts/processed_weather_data.csv', index=False)
if __name__ == "__main__":
process_weather_data()
???scripts/store_data.py
python
import pandas as pd
from sqlalchemy import create_engine
def store_weather_data():
df = pd.read_csv('/opt/airflow/scripts/processed_weather_data.csv')
engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres/airflow')
df.to_sql('weather', engine, if_exists='append', index=False)
if __name__ == "__main__":
store_weather_data()
My Final Notes: Automating data processing tasks such as weather data retrieval and storage is essential for streamlining workflows and ensuring data accuracy. With the combination of Airflow, Docker, and Python, users can easily orchestrate complex data pipelines and handle dependencies effectively. By leveraging these tools, organizations can save time and resources while maintaining data integrity and reliability.
????? Make sure you implement security measures; here's my basic framework:
????? This will help protect your system from potential security threats and ensure the confidentiality, integrity, and availability of your data and resources.
Thank you so much for taking the time to check out my hands-on project.
Your support means a lot!
Fidel Vetino (the Mad Scientist)
Tech Innovator & Solution Engineer
#AI / #GenAI / #LLM / #ML / #machine_learning / #artificialintelligence / #cybersecurity / #itsecurity / #facebook / #accenture / #twitter / #ibm / #dell / #intel / #emc2 / #salesforce / #linux / #freebsd / #unix / #memory / #sap / #walmart / #apps / #software / #technology / #io / #pipeline / #florida / #tampatech / #engineering / #sql / #database / #cloudcomputing / #data / #vulnerabilities / #soap / #rest / #graphQL / #rust / / #technews / #strategies / #data_governance / #data resilience / #hack / #hackathon / #techcommunity / #opensource / #blockchain / #moon2mars / #nasa / #Aerospace / #spacex / #mars / #orbit / #AWS / #oracle / #microsoft / #GCP / #Azure / #ERP / #spark / #snowflake / #SAP / #AI / #GenAI / #python / #Databricks
Chief Innovation Architect | Product Engineer for Space - Technology - Energy - Manufacturing (S.T.E.M) Sector including Cybersecurity * AI * Quantum
4 个月This project outlines the process of automating weather data processing using Apache Airflow, Docker, and Python. Through a series of steps, it demonstrates how to set up the Docker environment, define an Airflow Directed Acyclic Graph (DAG), write Python scripts to fetch, process, and store weather data, run the project, and verify the data stored in PostgreSQL. By following this guide, users can efficiently automate the extraction, transformation, and loading (ETL) of weather data, enhancing data-driven decision-making processes.