Automating Weather Data Processing with Airflow, Docker, and Python

Automating Weather Data Processing with Airflow, Docker, and Python

Here's another "Mad Scientist" Fidel Vetino latest project; on this project I'll demonstrate how to automate the process of fetching, processing, and storing weather data using Apache Airflow, Docker, and Python. Through a series of steps, we'll set up a Docker environment, define an Airflow DAG, write Python scripts to interact with weather APIs, and store the data in a PostgreSQL database. From my framework, users can implement a robust data pipeline for handling weather data efficiently. Let's get started:


Step 1: Set Up the Docker Environment

1.1 Create docker-compose.yml

Create a file named docker-compose.yml in the root directory of your project with the following content:

yaml

version: '3.7'

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  webserver:
    image: apache/airflow:2.4.3
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./requirements.txt:/requirements.txt
    ports:
      - "8080:8080"
    command: >
      bash -c "airflow db init &&
               airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected] &&
               airflow webserver"
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ] && ps -p $(cat /usr/local/airflow/airflow-webserver.pid)"]
      interval: 30s
      retries: 3
      start_period: 30s
      timeout: 30s

  scheduler:
    image: apache/airflow:2.4.3
    depends_on:
      - webserver
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./requirements.txt:/requirements.txt
    command: airflow scheduler

volumes:
  postgres_data:
        

1.2 Create requirements.txt

Create a file named requirements.txt in the root directory of your project with the following content:

txt

apache-airflow==2.4.3
psycopg2-binary
requests
pandas        

Step 2: Define the Airflow DAG

Create a folder named dags in the root directory of your project. Inside the dags folder, create a Python script named weather_dag.py.

2.1 Create weather_dag.py

python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'weather_pipeline',
    default_args=default_args,
    description='A simple weather data pipeline',
    schedule_interval=timedelta(days=1),
)

fetch_weather = PythonOperator(
    task_id='fetch_weather',
    python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
    dag=dag,
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
    dag=dag,
)

store_data = PythonOperator(
    task_id='store_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
    dag=dag,
)

fetch_weather >> process_data >> store_data        

Step 3: Write Python Scripts

Create a folder named scripts in the root directory of your project. Inside the scripts folder, create three Python scripts: fetch_weather.py | process_data.py, and store_data.py.

3.1 Create fetch_weather.py

python

import requests
import json

def fetch_weather_data():
    api_key = 'your_api_key'
    city = 'London'
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}'
    response = requests.get(url)
    data = response.json()
    with open('/opt/airflow/scripts/weather_data.json', 'w') as f:
        json.dump(data, f)

if __name__ == "__main__":
    fetch_weather_data()
        


Make sure to replace 'your_api_key' with your actual API key from OpenWeatherMap.

3.2 Create process_data.py

python

import json
import pandas as pd

def process_weather_data():
    with open('/opt/airflow/scripts/weather_data.json', 'r') as f:
        data = json.load(f)
    
    processed_data = {
        'city': data['name'],
        'temperature': data['main']['temp'],
        'humidity': data['main']['humidity'],
        'weather': data['weather'][0]['description'],
    }
    
    df = pd.DataFrame([processed_data])
    df.to_csv('/opt/airflow/scripts/processed_weather_data.csv', index=False)

if __name__ == "__main__":
    process_weather_data()        

3.3 Create store_data.py

python

import pandas as pd
from sqlalchemy import create_engine

def store_weather_data():
    df = pd.read_csv('/opt/airflow/scripts/processed_weather_data.csv')
    
    engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres/airflow')
    df.to_sql('weather', engine, if_exists='append', index=False)

if __name__ == "__main__":
    store_weather_data()
        

Step 4: Run the Project

4.1 Install Docker

Ensure Docker is installed and running on your local machine. You can download and install Docker from the official Docker website.

4.2 Build and Run Docker Containers

Run the following command in the root directory of your project to build and start the Docker containers:

sh

docker-compose up --build        

4.3 Access Airflow UI

After the containers are up and running, open your browser and go to:

arduino

https://localhost:8080        

4.4 Enable the DAG

In the Airflow UI, you will see the weather_pipeline DAG. Toggle the switch next to the DAG name to enable it.

4.5 Trigger the DAG

To manually trigger the DAG:

  1. Click on the weather_pipeline DAG.
  2. Click on the "Trigger DAG" button (a play button) in the top right corner.

Step 5: Verify Data in PostgreSQL

Once the DAG has run, you will need to verify that the data has been stored in the PostgreSQL database.

5.1 Connect to PostgreSQL

You can connect to PostgreSQL using the psql command-line tool or a graphical tool like pgAdmin. Here’s how to connect using psql from your terminal:

sh

docker exec -it <container_id_of_postgres> psql -U airflow -d airflow        

You can find the container ID by running:

sh

docker ps        

Look for the container running the postgres service.

5.2 Query the Data

Once connected to the PostgreSQL database, run the following SQL query to check the stored weather data:

sql

SELECT * FROM weather;        

This query will display all the rows in the weather table, showing the weather data fetched and processed by the Airflow pipeline.

Additional Details

Complete Docker Setup

??? docker-compose.yml

yaml

version: '3.7'

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  webserver:
    image: apache/airflow:2.4.3
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./requirements.txt:/requirements.txt
    ports:
      - "8080:8080"
    command: >
      bash -c "airflow db init &&
               airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected] &&
               airflow webserver"
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ] && ps -p $(cat /usr/local/airflow/airflow-webserver.pid)"]
      interval: 30s
      retries: 3
      start_period: 30s
      timeout: 30s

  scheduler:
    image: apache/airflow:2.4.3
    depends_on:
      - webserver
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./requirements.txt:/requirements.txt
    command: airflow scheduler

volumes:
  postgres_data:
        

??? requirements.txt

txt

apache-airflow==2.4.3
psycopg2-binary
requests
pandas        

Complete Airflow DAG

??? dags/weather_dag.py

python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'weather_pipeline',
    default_args=default_args,
    description='A simple weather data pipeline',
    schedule_interval=timedelta(days=1),
)

fetch_weather = PythonOperator(
    task_id='fetch_weather',
    python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
    dag=dag,
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
    dag=dag,
)

store_data = PythonOperator(
    task_id='store_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
    dag=dag,
)

fetch_weather >> process_data >> store_data
        

Complete Python Scripts

???scripts/fetch_weather.py

python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'weather_pipeline',
    default_args=default_args,
    description='A simple weather data pipeline',
    schedule_interval=timedelta(days=1),
)

fetch_weather = PythonOperator(
    task_id='fetch_weather',
    python_callable=lambda: os.system('python /opt/airflow/scripts/fetch_weather.py'),
    dag=dag,
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/process_data.py'),
    dag=dag,
)

store_data = PythonOperator(
    task_id='store_data',
    python_callable=lambda: os.system('python /opt/airflow/scripts/store_data.py'),
    dag=dag,
)

fetch_weather >> process_data >> store_data        

???scripts/process_data.py

python

import json
import pandas as pd

def process_weather_data():
    with open('/opt/airflow/scripts/weather_data.json', 'r') as f:
        data = json.load(f)
    
    processed_data = {
        'city': data['name'],
        'temperature': data['main']['temp'],
        'humidity': data['main']['humidity'],
        'weather': data['weather'][0]['description'],
    }
    
    df = pd.DataFrame([processed_data])
    df.to_csv('/opt/airflow/scripts/processed_weather_data.csv', index=False)

if __name__ == "__main__":
    process_weather_data()
        

???scripts/store_data.py

python

import pandas as pd
from sqlalchemy import create_engine

def store_weather_data():
    df = pd.read_csv('/opt/airflow/scripts/processed_weather_data.csv')
    
    engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres/airflow')
    df.to_sql('weather', engine, if_exists='append', index=False)

if __name__ == "__main__":
    store_weather_data()        

My Final Notes: Automating data processing tasks such as weather data retrieval and storage is essential for streamlining workflows and ensuring data accuracy. With the combination of Airflow, Docker, and Python, users can easily orchestrate complex data pipelines and handle dependencies effectively. By leveraging these tools, organizations can save time and resources while maintaining data integrity and reliability.

????? Make sure you implement security measures; here's my basic framework:

  1. API Key Handling: Ensure that sensitive information like API keys are handled securely. Avoid hardcoding them directly into scripts or committing them to version control systems. Instead, consider using environment variables or secure key management solutions.
  2. Database Credentials: Similarly, database credentials should be managed securely. Avoid exposing passwords or sensitive connection information in code or configuration files. Use environment variables or secure storage solutions to manage and access credentials.
  3. Container Security: When using Docker containers, ensure that you use secure base images from trusted sources. Regularly update your images and dependencies to patch any known vulnerabilities. Limit the exposure of sensitive data within containers and minimize the attack surface.
  4. Network Security: Restrict network access to your Docker containers and databases. Utilize firewalls and network policies to control inbound and outbound traffic. Avoid exposing unnecessary ports or services to the public internet.
  5. Airflow Security: Secure your Airflow web server by configuring authentication and authorization mechanisms. Utilize strong passwords for Airflow users and restrict access to sensitive functionality and APIs.
  6. Data Encryption: Consider encrypting sensitive data both at rest and in transit. Use SSL/TLS encryption for network communication, and encrypt data stored in databases or on disk.
  7. Audit Logging: Implement comprehensive logging mechanisms to track user actions, system events, and access to sensitive resources. Regularly review logs for suspicious activities and potential security incidents.
  8. Regular Audits and Updates: Conduct regular security audits of your system to identify and address any vulnerabilities or weaknesses. Stay informed about security best practices and updates for all the tools and libraries used in your project.

????? This will help protect your system from potential security threats and ensure the confidentiality, integrity, and availability of your data and resources.


Thank you so much for taking the time to check out my hands-on project.

Your support means a lot!

Fidel Vetino (the Mad Scientist)

Tech Innovator & Solution Engineer




#AI / #GenAI / #LLM / #ML / #machine_learning / #artificialintelligence / #cybersecurity / #itsecurity / #facebook / #accenture / #twitter / #ibm / #dell / #intel / #emc2 / #salesforce / #linux / #freebsd / #unix / #memory / #sap / #walmart / #apps / #software / #technology / #io / #pipeline / #florida / #tampatech / #engineering / #sql / #database / #cloudcomputing / #data / #vulnerabilities / #soap / #rest / #graphQL / #rust / / #technews / #strategies / #data_governance / #data resilience / #hack / #hackathon / #techcommunity / #opensource / #blockchain / #moon2mars / #nasa / #Aerospace / #spacex / #mars / #orbit / #AWS / #oracle / #microsoft / #GCP / #Azure / #ERP / #spark / #snowflake / #SAP / #AI / #GenAI / #python / #Databricks

Fidel .V

Chief Innovation Architect | Product Engineer for Space - Technology - Energy - Manufacturing (S.T.E.M) Sector including Cybersecurity * AI * Quantum

4 个月

This project outlines the process of automating weather data processing using Apache Airflow, Docker, and Python. Through a series of steps, it demonstrates how to set up the Docker environment, define an Airflow Directed Acyclic Graph (DAG), write Python scripts to fetch, process, and store weather data, run the project, and verify the data stored in PostgreSQL. By following this guide, users can efficiently automate the extraction, transformation, and loading (ETL) of weather data, enhancing data-driven decision-making processes.

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了