Skip to content

A Pegasus workflow system for processing air quality sensor data from OpenAQ/Sage to detect anomalies and analyze pollutant trends.

License

Notifications You must be signed in to change notification settings

pegasus-isi/airquality-workflow

Repository files navigation

AQI Forecasting Workflow

A Pegasus workflow system for processing air quality sensor data from OpenAQ/Sage to detect anomalies and analyze pollutant trends. It also has LSTM-based AQI forecasting capabilities, enabling prediction of air quality for the next 24 hours based on historical patterns for OpenAQ data.

Overview

The AQI forecasting workflow extends the base air quality analysis with machine learning capabilities:

  • Fetches historical data: 90 days (configurable) of air quality measurements for training
  • Trains LSTM models: Per-location PyTorch-based LSTM models for time-series prediction
  • Generates forecasts: 24-hour (configurable) AQI predictions with confidence intervals
  • Visualizes results: Plots showing historical data, forecasts, and confidence bounds

Data Sources

This workflow supports two data sources:

  • OpenAQ API v3: Global air quality monitoring stations (default).
  • SAGE Continuum: Edge sensor data via sage_data_client or local JSONL exports.

OpenAQ API v3

The workflow uses the OpenAQ API v3 to fetch air quality measurements from monitoring stations worldwide.

API Endpoints

Endpoint Purpose
https://api.openaq.org/v3/locations Search for monitoring locations
https://api.openaq.org/v3/locations/{id} Get location details
https://api.openaq.org/v3/locations/{id}/sensors Get sensors at a location
https://api.openaq.org/v3/sensors/{id}/days Fetch daily aggregated measurements

Parameters Fetched

The fetch_openaq_catalog.py script retrieves the following pollutant measurements:

Parameter Description OpenAQ ID Typical Unit
pm25 Fine particulate matter (≤2.5 μm) 2 µg/m³
pm10 Coarse particulate matter (≤10 μm) 1 µg/m³
o3 Ozone 5 µg/m³ or ppm
no2 Nitrogen Dioxide 3 µg/m³ or ppm
so2 Sulfur Dioxide 4 µg/m³ or ppm
co Carbon Monoxide 7 µg/m³ or ppm

Data Characteristics

  • Source: Global network of air quality monitoring stations
  • Temporal Resolution: Daily aggregates (min, max, median, mean, std)
  • Coverage: Historical data from stations worldwide
  • API Key: Required - register at https://explore.openaq.org/register

For more details, see the OpenAQ API documentation.

Customizing Parameters

To change or add parameters, modify the fetch_openaq_catalog.py script:

  1. Edit the default parameters list in the fetch_openaq_catalog() function (around line 117):
if parameters is None:
    parameters = ['pm25', 'pm10', 'o3', 'no2', 'so2', 'co']
  1. Update the parameter map if adding new parameters (around line 120):
parameter_map = {
    'pm25': 2,    # PM2.5
    'pm10': 1,    # PM10
    'o3': 5,      # Ozone
    'no2': 3,     # Nitrogen Dioxide
    'so2': 4,     # Sulfur Dioxide
    'co': 7       # Carbon Monoxide
    # Add new parameters here with their OpenAQ IDs
}
  1. Or use command-line arguments to select specific parameters:
./fetch_openaq_catalog.py --location-ids 2178 --parameters pm25 pm10 o3 --start-date 2024-01-01

Finding OpenAQ Parameter IDs: Use the OpenAQ Explorer (https://explore.openaq.org/) or API to discover available parameters and their IDs for specific locations.

SAGE Continuum

SAGE data can be fetched directly using sage_data_client (recommended) or via a local JSONL file downloaded with curl. For details on the platform, see https://sagecontinuum.org/.

Direct query (recommended):

./workflow_generator.py \
    --data-source sage \
    --sage-vsn W045 \
    --sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
    --sage-names env.air_quality.conc \
    --start-date 2026-01-14 \
    --end-date 2026-01-15 \
    --output workflow_forecast.yml

JSONL input (offline):

./workflow_generator.py \
    --data-source sage \
    --sage-input sage/input_data.json \
    --sage-vsn W045 \
    --sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
    --start-date 2026-01-14 \
    --end-date 2026-01-15 \
    --output workflow_forecast.yml

Notes:

  • SAGE runs the base pipeline (extract → analyze → anomaly detection) and skips the forecast pipeline by default.
  • Use --sage-names to filter sensor streams (e.g., env.air_quality.conc, env.pm10).

Architecture

Workflow Structure

Base Pipeline (runs in parallel):
  Data Fetch → Extract Timeseries → Analyze → Detect Anomalies → Merge

Forecast Pipeline:
  Data Fetch → Extract Timeseries ─────────┐
                                           ↓
          Fetch Historical Data → Prepare Features → Train LSTM → Generate Forecast → Visualize

DAG Visualization

The following diagram shows the workflow DAG for a single location:

Air Quality Workflow DAG

Components

Component Purpose Memory Container
fetch_historical_data.py Fetch 90 days of historical AQI data 2 GB Forecast
prepare_features.py Feature engineering for LSTM 2 GB Forecast
train_forecast_model.py Train PyTorch LSTM model 4 GB Forecast
generate_forecast.py Generate 24-hour predictions 2 GB Forecast
visualize_forecast.py Create forecast visualizations 2 GB Forecast

Running on ACCESS

The easiest way to run this workflow is using the provided Jupyter notebook on an ACCESS resource with Pegasus and HTCondor pre-configured:

Notebook: Access-Airquality-workflow.ipynb

The notebook walks through the complete workflow: configuring parameters, generating the Pegasus DAG, submitting to HTCondor, monitoring execution, and examining results with inline visualizations.

Running on FABRIC

The workflow can also be run on the FABRIC testbed by deploying a distributed Pegasus/HTCondor cluster across FABRIC sites.

Deploy a Pegasus/HTCondor Cluster

You can provision a cluster using either of the following notebooks:

Option Link Description
FABRIC Artifact (Recommended) Pegasus-FABRIC Artifact Pre-configured notebook from the FABRIC Artifacts repository
Jupyter Examples pegasus-fabric.ipynb Notebook from the official FABRIC Jupyter examples

Both notebooks provision the following cluster architecture:

  • Submit Node -- Central Manager running HTCondor scheduler and Pegasus WMS
  • Worker Nodes -- Distributed execution points across multiple FABRIC sites
  • FABNetv4 Networking -- Private L3 network connecting all nodes

Setup Steps

  1. Log into the FABRIC JupyterHub
  2. Upload or clone one of the Pegasus-FABRIC notebooks above
  3. Configure your desired sites and node specifications
  4. Run the notebook to provision the cluster
  5. Clone this repository on the submit node
  6. Run the workflow using the CLI or the Access notebook

Prerequisites

Python Dependencies

pip install -r requirements.txt

Quick Start

1. Build Docker Container

cd Docker
docker build -f AirQuality_Forecast_Dockerfile -t kthare10/airquality-forecast:latest .
docker push kthare10/airquality-forecast:latest  # If using with Pegasus

2. Set OpenAQ API Key (Only needed when using OpenAQ Data Source)

The workflow requires an OpenAQ API key to fetch historical data:

# Set API key (required!)
export OPENAQ_API_KEY='your-api-key-here'

# Get your API key at: https://explore.openaq.org/register

3. Generate Workflow

# Basic usage
./workflow_generator.py \
    --location-ids 2178 \
    --start-date 2024-01-15 \
    --end-date 2024-01-16 \
    --output workflow_forecast.yml

# SAGE usage (live query)
./workflow_generator.py \
    --data-source sage \
    --sage-vsn W045 \
    --sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
    --sage-names env.air_quality.conc \
    --start-date 2026-01-14 \
    --end-date 2026-01-15 \
    --output workflow_forecast.yml

# Advanced options
./workflow_generator.py \
    --location-ids 2178 1490 \
    --start-date 2024-01-15 \
    --historical-days 90 \
    --forecast-horizon 48 \
    --parameters pm25 pm10 o3 so2 \
    --execution-site condorpool \
    --output workflow_forecast.yml

4. Submit Workflow

pegasus-plan --submit -s condorpool -o local workflow_forecast.yml

5. Monitor Workflow

pegasus-status /path/to/submit/directory
pegasus-analyzer /path/to/submit/directory

Command-Line Options

workflow_generator.py

Option Type Default Description
--location-ids int+ Required for OpenAQ OpenAQ location IDs to analyze
--start-date YYYY-MM-DD Required Analysis start date
--end-date YYYY-MM-DD start_date + 1 day Analysis end date
--historical-days int 90 Days of historical data for training
--forecast-horizon int 24 Hours to forecast ahead
--parameters str+ All 6 Pollutants: pm25, pm10, o3, no2, so2, co
--execution-site str condorpool HTCondor execution site
-o, --output str workflow_forecast.yml Output YAML file
--data-source str openaq Data source: openaq or sage
--sage-input str none Path to SAGE JSONL file (offline mode)
--sage-vsn str none Filter SAGE data by VSN
--sage-plugin str none Filter SAGE data by plugin
--sage-names str+ none Filter SAGE data by measurement names
--skip-forecast flag false Skip LSTM forecast pipeline

LSTM Model Architecture

Model Configuration

Input: (batch, 168 timesteps, ~24 features)
  ↓
LSTM Layer 1 (hidden_size=128, dropout=0.2)
  ↓
LSTM Layer 2 (hidden_size=128, dropout=0.2)
  ↓
Fully Connected Layer (hidden_size24 predictions)
  ↓
Output: (batch, 24 hourly AQI predictions)

Features

The model uses 24 features per timestep:

Raw Features (7):

  • AQI value
  • PM2.5, PM10, O3, NO2, SO2, CO concentrations

Temporal Features (6, cyclic encoded):

  • Hour of day (sin/cos)
  • Day of week (sin/cos)
  • Month (sin/cos)

Statistical Features (12):

  • Rolling mean (6h, 12h, 24h windows)
  • Rolling std (6h, 12h, 24h windows)

Training Configuration

Parameter Value Description
Lookback 168 hours (7 days) Historical window for input
Horizon 24 hours Prediction length
Batch Size 32 Training batch size
Epochs 100 Maximum epochs (with early stopping)
Patience 10 Early stopping patience
Learning Rate 0.001 Adam optimizer initial LR
Loss Function MSE Mean Squared Error
Validation Split 20% Train/validation split ratio

Output Files

Directory Structure

output/
├── timeseries/
│   └── <location>/
│       └── <location>_timeseries.json        # Recent AQI measurements
├── historical/
│   └── <location>/
│       ├── <location>_historical.csv          # 90 days of training data
│       └── <location>_coverage.json           # Data completeness info
├── features/
│   └── <location>/
│       ├── <location>_train.npz               # Feature matrices (X, y)
│       └── <location>_train_scaler.json       # Normalization parameters
├── models/
│   └── <location>/
│       ├── <location>_lstm_checkpoint.pt      # Trained model weights
│       └── <location>_training_info.json      # Training metadata
├── forecasts/
│   └── <location>/
│       ├── <location>_forecast.json           # 24-hour predictions
│       ├── <location>_forecast.png            # Visualization
│       └── <location>_forecast_summary.json   # Statistics
├── analysis/                                   # Base workflow outputs
│   └── <location>/
│       ├── <location>_analysis.png
│       └── <location>_statistics.json
└── anomalies/                                  # Base workflow outputs
    └── <location>/
        └── <location>_anomalies.json

Forecast JSON Format

{
  "location": "London N. Kensington",
  "forecast_generated": "2024-01-15T10:00:00Z",
  "forecast_start": "2024-01-16T00:00:00Z",
  "forecast_horizon_hours": 24,
  "model_info": {
    "architecture": "LSTM",
    "input_size": 24,
    "hidden_size": 128,
    "num_layers": 2
  },
  "predictions": [
    {
      "datetime": "2024-01-16T00:00:00Z",
      "predicted_aqi": 45.2,
      "confidence_interval_lower": 35.6,
      "confidence_interval_upper": 54.8,
      "predicted_category": "Good"
    },
    // ... 23 more hours
  ]
}

Visualization

The forecast visualization includes:

  1. Historical AQI (last 7 days): Solid blue line with markers
  2. Forecast (next 24 hours): Dashed orange line with markers
  3. Confidence Intervals: Shaded orange region (95% CI)
  4. AQI Categories: Background color bands (Good=green, Moderate=yellow, etc.)
  5. Separation Line: Vertical line marking forecast start
  6. Legend: Model info and horizon

AI Interactions

This workflow was developed collaboratively with Claude. Below are the key prompts used during development.

Initial Setup:

  • "Can you help me create an air quality workflow similar to the Orcasound workflow, using https://openaq.org/ as the data source?"

Feature Development:

Debugging and Fixes:

  • "Workflow failed with: Job fetch_historical_fetch_hist_Del_Norte errors: Transfer output files failure... Can you help me fix this?"
  • "Fetch tasks are failing with error: Failed to fetch historical data: OpenAQ API key not found. Can you help?"
  • "Prepare task fails with error: Could not convert pm25 to numeric. Can you help?"

Troubleshooting

Common Issues

Issue: Insufficient data coverage warning

  • Cause: Location has <70% data completeness
  • Solution: Increase --historical-days or choose location with better coverage

Issue: Training failed with NaN loss

  • Cause: Insufficient data or poor feature quality
  • Solution: Check data coverage, try different location, or adjust normalization

Issue: Model checkpoint not found

  • Cause: Training job failed or didn't complete
  • Solution: Check Pegasus logs, verify training completed successfully

Issue: Container pull failed

  • Cause: Docker image not available or Singularity issues
  • Solution: Build and push Docker image, or check Singularity configuration

Debugging Tips

# Check workflow status
pegasus-status /path/to/submit/dir

# View job logs
pegasus-analyzer /path/to/submit/dir

# Check specific job output
cat /path/to/submit/dir/*/*.out
cat /path/to/submit/dir/*/*.err

# Verify historical data coverage
cat output/historical/<location>/<location>_coverage.json

# Check training progress
cat output/models/<location>/<location>_training_info.json

References

Citation

If you use this workflow in your research, please cite:

@misc{airquality-workflow,
  title={Air Quality Forecasting Workflow using Pegasus WMS},
  year={2025},
  publisher={GitHub},
  url={https://github.com/pegasus-isi/airquality-workflow}
}

Support

For issues or questions:

Authors

Komal Thareja (kthare10@renci.org)

P.S: Built with the assistance of Claude, Anthropic's AI assistant.

About

A Pegasus workflow system for processing air quality sensor data from OpenAQ/Sage to detect anomalies and analyze pollutant trends.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published