A Pegasus workflow system for processing air quality sensor data from OpenAQ/Sage to detect anomalies and analyze pollutant trends. It also has LSTM-based AQI forecasting capabilities, enabling prediction of air quality for the next 24 hours based on historical patterns for OpenAQ data.
The AQI forecasting workflow extends the base air quality analysis with machine learning capabilities:
- Fetches historical data: 90 days (configurable) of air quality measurements for training
- Trains LSTM models: Per-location PyTorch-based LSTM models for time-series prediction
- Generates forecasts: 24-hour (configurable) AQI predictions with confidence intervals
- Visualizes results: Plots showing historical data, forecasts, and confidence bounds
This workflow supports two data sources:
- OpenAQ API v3: Global air quality monitoring stations (default).
- SAGE Continuum: Edge sensor data via
sage_data_clientor local JSONL exports.
The workflow uses the OpenAQ API v3 to fetch air quality measurements from monitoring stations worldwide.
| Endpoint | Purpose |
|---|---|
https://api.openaq.org/v3/locations |
Search for monitoring locations |
https://api.openaq.org/v3/locations/{id} |
Get location details |
https://api.openaq.org/v3/locations/{id}/sensors |
Get sensors at a location |
https://api.openaq.org/v3/sensors/{id}/days |
Fetch daily aggregated measurements |
The fetch_openaq_catalog.py script retrieves the following pollutant measurements:
| Parameter | Description | OpenAQ ID | Typical Unit |
|---|---|---|---|
pm25 |
Fine particulate matter (≤2.5 μm) | 2 | µg/m³ |
pm10 |
Coarse particulate matter (≤10 μm) | 1 | µg/m³ |
o3 |
Ozone | 5 | µg/m³ or ppm |
no2 |
Nitrogen Dioxide | 3 | µg/m³ or ppm |
so2 |
Sulfur Dioxide | 4 | µg/m³ or ppm |
co |
Carbon Monoxide | 7 | µg/m³ or ppm |
- Source: Global network of air quality monitoring stations
- Temporal Resolution: Daily aggregates (min, max, median, mean, std)
- Coverage: Historical data from stations worldwide
- API Key: Required - register at https://explore.openaq.org/register
For more details, see the OpenAQ API documentation.
To change or add parameters, modify the fetch_openaq_catalog.py script:
- Edit the default parameters list in the
fetch_openaq_catalog()function (around line 117):
if parameters is None:
parameters = ['pm25', 'pm10', 'o3', 'no2', 'so2', 'co']- Update the parameter map if adding new parameters (around line 120):
parameter_map = {
'pm25': 2, # PM2.5
'pm10': 1, # PM10
'o3': 5, # Ozone
'no2': 3, # Nitrogen Dioxide
'so2': 4, # Sulfur Dioxide
'co': 7 # Carbon Monoxide
# Add new parameters here with their OpenAQ IDs
}- Or use command-line arguments to select specific parameters:
./fetch_openaq_catalog.py --location-ids 2178 --parameters pm25 pm10 o3 --start-date 2024-01-01Finding OpenAQ Parameter IDs: Use the OpenAQ Explorer (https://explore.openaq.org/) or API to discover available parameters and their IDs for specific locations.
SAGE data can be fetched directly using sage_data_client (recommended) or via a local JSONL file downloaded with curl. For details on the platform, see https://sagecontinuum.org/.
Direct query (recommended):
./workflow_generator.py \
--data-source sage \
--sage-vsn W045 \
--sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
--sage-names env.air_quality.conc \
--start-date 2026-01-14 \
--end-date 2026-01-15 \
--output workflow_forecast.ymlJSONL input (offline):
./workflow_generator.py \
--data-source sage \
--sage-input sage/input_data.json \
--sage-vsn W045 \
--sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
--start-date 2026-01-14 \
--end-date 2026-01-15 \
--output workflow_forecast.ymlNotes:
- SAGE runs the base pipeline (extract → analyze → anomaly detection) and skips the forecast pipeline by default.
- Use
--sage-namesto filter sensor streams (e.g.,env.air_quality.conc,env.pm10).
Base Pipeline (runs in parallel):
Data Fetch → Extract Timeseries → Analyze → Detect Anomalies → Merge
Forecast Pipeline:
Data Fetch → Extract Timeseries ─────────┐
↓
Fetch Historical Data → Prepare Features → Train LSTM → Generate Forecast → Visualize
The following diagram shows the workflow DAG for a single location:
| Component | Purpose | Memory | Container |
|---|---|---|---|
fetch_historical_data.py |
Fetch 90 days of historical AQI data | 2 GB | Forecast |
prepare_features.py |
Feature engineering for LSTM | 2 GB | Forecast |
train_forecast_model.py |
Train PyTorch LSTM model | 4 GB | Forecast |
generate_forecast.py |
Generate 24-hour predictions | 2 GB | Forecast |
visualize_forecast.py |
Create forecast visualizations | 2 GB | Forecast |
The easiest way to run this workflow is using the provided Jupyter notebook on an ACCESS resource with Pegasus and HTCondor pre-configured:
Notebook: Access-Airquality-workflow.ipynb
The notebook walks through the complete workflow: configuring parameters, generating the Pegasus DAG, submitting to HTCondor, monitoring execution, and examining results with inline visualizations.
The workflow can also be run on the FABRIC testbed by deploying a distributed Pegasus/HTCondor cluster across FABRIC sites.
You can provision a cluster using either of the following notebooks:
| Option | Link | Description |
|---|---|---|
| FABRIC Artifact (Recommended) | Pegasus-FABRIC Artifact | Pre-configured notebook from the FABRIC Artifacts repository |
| Jupyter Examples | pegasus-fabric.ipynb | Notebook from the official FABRIC Jupyter examples |
Both notebooks provision the following cluster architecture:
- Submit Node -- Central Manager running HTCondor scheduler and Pegasus WMS
- Worker Nodes -- Distributed execution points across multiple FABRIC sites
- FABNetv4 Networking -- Private L3 network connecting all nodes
- Log into the FABRIC JupyterHub
- Upload or clone one of the Pegasus-FABRIC notebooks above
- Configure your desired sites and node specifications
- Run the notebook to provision the cluster
- Clone this repository on the submit node
- Run the workflow using the CLI or the Access notebook
pip install -r requirements.txtcd Docker
docker build -f AirQuality_Forecast_Dockerfile -t kthare10/airquality-forecast:latest .
docker push kthare10/airquality-forecast:latest # If using with PegasusThe workflow requires an OpenAQ API key to fetch historical data:
# Set API key (required!)
export OPENAQ_API_KEY='your-api-key-here'
# Get your API key at: https://explore.openaq.org/register# Basic usage
./workflow_generator.py \
--location-ids 2178 \
--start-date 2024-01-15 \
--end-date 2024-01-16 \
--output workflow_forecast.yml
# SAGE usage (live query)
./workflow_generator.py \
--data-source sage \
--sage-vsn W045 \
--sage-plugin registry.sagecontinuum.org/seanshahkarami/air-quality:0.3.0 \
--sage-names env.air_quality.conc \
--start-date 2026-01-14 \
--end-date 2026-01-15 \
--output workflow_forecast.yml
# Advanced options
./workflow_generator.py \
--location-ids 2178 1490 \
--start-date 2024-01-15 \
--historical-days 90 \
--forecast-horizon 48 \
--parameters pm25 pm10 o3 so2 \
--execution-site condorpool \
--output workflow_forecast.ymlpegasus-plan --submit -s condorpool -o local workflow_forecast.ymlpegasus-status /path/to/submit/directory
pegasus-analyzer /path/to/submit/directory| Option | Type | Default | Description |
|---|---|---|---|
--location-ids |
int+ | Required for OpenAQ | OpenAQ location IDs to analyze |
--start-date |
YYYY-MM-DD | Required | Analysis start date |
--end-date |
YYYY-MM-DD | start_date + 1 day | Analysis end date |
--historical-days |
int | 90 | Days of historical data for training |
--forecast-horizon |
int | 24 | Hours to forecast ahead |
--parameters |
str+ | All 6 | Pollutants: pm25, pm10, o3, no2, so2, co |
--execution-site |
str | condorpool | HTCondor execution site |
-o, --output |
str | workflow_forecast.yml | Output YAML file |
--data-source |
str | openaq | Data source: openaq or sage |
--sage-input |
str | none | Path to SAGE JSONL file (offline mode) |
--sage-vsn |
str | none | Filter SAGE data by VSN |
--sage-plugin |
str | none | Filter SAGE data by plugin |
--sage-names |
str+ | none | Filter SAGE data by measurement names |
--skip-forecast |
flag | false | Skip LSTM forecast pipeline |
Input: (batch, 168 timesteps, ~24 features)
↓
LSTM Layer 1 (hidden_size=128, dropout=0.2)
↓
LSTM Layer 2 (hidden_size=128, dropout=0.2)
↓
Fully Connected Layer (hidden_size → 24 predictions)
↓
Output: (batch, 24 hourly AQI predictions)The model uses 24 features per timestep:
Raw Features (7):
- AQI value
- PM2.5, PM10, O3, NO2, SO2, CO concentrations
Temporal Features (6, cyclic encoded):
- Hour of day (sin/cos)
- Day of week (sin/cos)
- Month (sin/cos)
Statistical Features (12):
- Rolling mean (6h, 12h, 24h windows)
- Rolling std (6h, 12h, 24h windows)
| Parameter | Value | Description |
|---|---|---|
| Lookback | 168 hours (7 days) | Historical window for input |
| Horizon | 24 hours | Prediction length |
| Batch Size | 32 | Training batch size |
| Epochs | 100 | Maximum epochs (with early stopping) |
| Patience | 10 | Early stopping patience |
| Learning Rate | 0.001 | Adam optimizer initial LR |
| Loss Function | MSE | Mean Squared Error |
| Validation Split | 20% | Train/validation split ratio |
output/
├── timeseries/
│ └── <location>/
│ └── <location>_timeseries.json # Recent AQI measurements
├── historical/
│ └── <location>/
│ ├── <location>_historical.csv # 90 days of training data
│ └── <location>_coverage.json # Data completeness info
├── features/
│ └── <location>/
│ ├── <location>_train.npz # Feature matrices (X, y)
│ └── <location>_train_scaler.json # Normalization parameters
├── models/
│ └── <location>/
│ ├── <location>_lstm_checkpoint.pt # Trained model weights
│ └── <location>_training_info.json # Training metadata
├── forecasts/
│ └── <location>/
│ ├── <location>_forecast.json # 24-hour predictions
│ ├── <location>_forecast.png # Visualization
│ └── <location>_forecast_summary.json # Statistics
├── analysis/ # Base workflow outputs
│ └── <location>/
│ ├── <location>_analysis.png
│ └── <location>_statistics.json
└── anomalies/ # Base workflow outputs
└── <location>/
└── <location>_anomalies.json
{
"location": "London N. Kensington",
"forecast_generated": "2024-01-15T10:00:00Z",
"forecast_start": "2024-01-16T00:00:00Z",
"forecast_horizon_hours": 24,
"model_info": {
"architecture": "LSTM",
"input_size": 24,
"hidden_size": 128,
"num_layers": 2
},
"predictions": [
{
"datetime": "2024-01-16T00:00:00Z",
"predicted_aqi": 45.2,
"confidence_interval_lower": 35.6,
"confidence_interval_upper": 54.8,
"predicted_category": "Good"
},
// ... 23 more hours
]
}The forecast visualization includes:
- Historical AQI (last 7 days): Solid blue line with markers
- Forecast (next 24 hours): Dashed orange line with markers
- Confidence Intervals: Shaded orange region (95% CI)
- AQI Categories: Background color bands (Good=green, Moderate=yellow, etc.)
- Separation Line: Vertical line marking forecast start
- Legend: Model info and horizon
This workflow was developed collaboratively with Claude. Below are the key prompts used during development.
Initial Setup:
- "Can you help me create an air quality workflow similar to the Orcasound workflow, using https://openaq.org/ as the data source?"
Feature Development:
- "Suggest ML ideas for the Air Quality Workflow."
- "Add LSTM-based prediction to the workflow. Can we visualize these predictions?"
- "The workflow currently fetches data from OpenAQ. Can you add support to fetch data from SAGE Continuum: https://portal.sagecontinuum.org/? API: https://sagecontinuum.org/docs/tutorials/accessing-data#using-sage-data-client"
Debugging and Fixes:
- "Workflow failed with: Job
fetch_historical_fetch_hist_Del_Norteerrors:Transfer output files failure...Can you help me fix this?" - "Fetch tasks are failing with error:
Failed to fetch historical data: OpenAQ API key not found.Can you help?" - "Prepare task fails with error:
Could not convert pm25 to numeric.Can you help?"
Issue: Insufficient data coverage warning
- Cause: Location has <70% data completeness
- Solution: Increase
--historical-daysor choose location with better coverage
Issue: Training failed with NaN loss
- Cause: Insufficient data or poor feature quality
- Solution: Check data coverage, try different location, or adjust normalization
Issue: Model checkpoint not found
- Cause: Training job failed or didn't complete
- Solution: Check Pegasus logs, verify training completed successfully
Issue: Container pull failed
- Cause: Docker image not available or Singularity issues
- Solution: Build and push Docker image, or check Singularity configuration
# Check workflow status
pegasus-status /path/to/submit/dir
# View job logs
pegasus-analyzer /path/to/submit/dir
# Check specific job output
cat /path/to/submit/dir/*/*.out
cat /path/to/submit/dir/*/*.err
# Verify historical data coverage
cat output/historical/<location>/<location>_coverage.json
# Check training progress
cat output/models/<location>/<location>_training_info.json- OpenAQ API: https://docs.openaq.org/
- PyTorch Documentation: https://pytorch.org/docs/
- Pegasus WMS: https://pegasus.isi.edu/
- EPA AQI Standards: https://www.airnow.gov/aqi/aqi-basics/
- Sage: https://sagecontinuum.org/
If you use this workflow in your research, please cite:
@misc{airquality-workflow,
title={Air Quality Forecasting Workflow using Pegasus WMS},
year={2025},
publisher={GitHub},
url={https://github.com/pegasus-isi/airquality-workflow}
}
For issues or questions:
- GitHub Issues: https://github.com/your-repo/issues
- FABRIC Support: https://learn.fabric-testbed.net/
Komal Thareja (kthare10@renci.org)
P.S: Built with the assistance of Claude, Anthropic's AI assistant.
