Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/486.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added support for CMIP7 datasets including database tables, dataset adapters, and ESGF integration.
CMIP7 datasets can now be ingested and queried, with a proxy mechanism that translates CMIP7 requests to CMIP6 queries.
64 changes: 62 additions & 2 deletions packages/climate-ref-core/src/climate_ref_core/cmip6_to_cmip7.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,16 @@ class CMIP7Metadata:

This captures the additional/modified attributes needed for CMIP7 format.
Based on CMIP7 Global Attributes V1.0 (DOI: 10.5281/zenodo.17250297).

Mandatory attributes per spec:
- mip_era, region, drs_specs, data_specs_version, product, license_id
- temporal_label, vertical_label, horizontal_label, area_label
- branding_suffix (derived from labels)
"""

# Required new attributes
# Required new attributes (mandatory per CMIP7 V1.0 spec)
mip_era: str = "CMIP7"
region: str = "glb"
region: str = "glb" # lowercase per spec
drs_specs: str = "MIP-DRS7"
data_specs_version: str = "MIP-DS7.1.0.0"
product: str = "model-output"
Expand Down Expand Up @@ -609,3 +614,58 @@ def create_cmip7_path(attrs: dict[str, Any], version: str | None = None) -> str:
version_str,
]
return "/".join(str(c) for c in components)


def create_cmip7_instance_id(attrs: dict[str, Any]) -> str:
"""
Create a CMIP7 instance_id following MIP-DRS7 spec.

The instance_id uniquely identifies a dataset using the DRS components
separated by periods.

Parameters
----------
attrs
Dictionary containing CMIP7 attributes

Returns
-------
str
The CMIP7 instance_id

Examples
--------
>>> attrs = {
... "drs_specs": "MIP-DRS7",
... "mip_era": "CMIP7",
... "activity_id": "CMIP",
... "institution_id": "CCCma",
... "source_id": "CanESM6-MR",
... "experiment_id": "historical",
... "variant_label": "r2i1p1f1",
... "region": "glb",
... "frequency": "mon",
... "variable_id": "tas",
... "branding_suffix": "tavg-h2m-hxy-u",
... "grid_label": "g13s",
... "version": "v20250622",
... }
>>> create_cmip7_instance_id(attrs)
'MIP-DRS7.CMIP7.CMIP.CCCma.CanESM6-MR.historical.r2i1p1f1.glb.mon.tas.tavg-h2m-hxy-u.g13s.v20250622'
"""
components = [
attrs.get("drs_specs", "MIP-DRS7"),
attrs.get("mip_era", "CMIP7"),
attrs.get("activity_id", "CMIP"),
attrs.get("institution_id", ""),
attrs.get("source_id", ""),
attrs.get("experiment_id", ""),
attrs.get("variant_label", ""),
attrs.get("region", "glb"),
attrs.get("frequency", "mon"),
attrs.get("variable_id", ""),
attrs.get("branding_suffix", ""),
attrs.get("grid_label", "gn"),
attrs.get("version", "v1"),
]
return ".".join(str(c) for c in components)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

from climate_ref_core.esgf.base import ESGFRequest, IntakeESGFMixin
from climate_ref_core.esgf.cmip6 import CMIP6Request
from climate_ref_core.esgf.cmip7 import CMIP7Request
from climate_ref_core.esgf.fetcher import ESGFFetcher
from climate_ref_core.esgf.obs4mips import Obs4MIPsRequest

__all__ = [
"CMIP6Request",
"CMIP7Request",
"ESGFFetcher",
"ESGFRequest",
"IntakeESGFMixin",
Expand Down
291 changes: 291 additions & 0 deletions packages/climate-ref-core/src/climate_ref_core/esgf/cmip7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
"""
CMIP7 dataset request implementation.

This module provides a CMIP7Request class that wraps CMIP6 data from ESGF
and converts it to CMIP7 format for testing diagnostics with CMIP7-style data
before actual CMIP7 data becomes available on ESGF.
"""

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

import pandas as pd
import xarray as xr
from loguru import logger

from climate_ref_core.cmip6_to_cmip7 import (
convert_cmip6_dataset,
create_cmip7_filename,
create_cmip7_instance_id,
create_cmip7_path,
get_branding_suffix,
get_frequency_from_table,
get_realm_from_table,
)
from climate_ref_core.esgf.cmip6 import CMIP6Request

if TYPE_CHECKING:
import xarray as xr


def _convert_file(
cmip6_path: Path,
output_dir: Path,
rename_variables: bool = False,
) -> tuple[Path, dict[str, Any]] | None:
"""
Convert a single CMIP6 file to CMIP7 format.

Parameters
----------
cmip6_path
Path to the CMIP6 file
output_dir
Root directory for converted CMIP7 files
rename_variables
If True, rename variables to CMIP7 branded format

Returns
-------
tuple[Path, dict[str, Any]] | None
Tuple of (cmip7_path, cmip7_attrs) or None if conversion failed
"""
try:
ds: xr.Dataset = xr.open_dataset(cmip6_path)
except Exception as e:
logger.warning(f"Failed to open {cmip6_path}: {e}")
return None

try:
ds_cmip7 = convert_cmip6_dataset(ds, rename_variables=rename_variables)

# Build output path using CMIP7 DRS
# TODO: extract time range from data for filename
cmip7_subpath = create_cmip7_path(ds_cmip7.attrs)
cmip7_filename = create_cmip7_filename(ds_cmip7.attrs, time_range=None)
cmip7_dir = output_dir / cmip7_subpath
cmip7_dir.mkdir(parents=True, exist_ok=True)

cmip7_path = cmip7_dir / cmip7_filename

# Only write if file doesn't already exist
if not cmip7_path.exists():
ds_cmip7.to_netcdf(cmip7_path)
logger.debug(f"Wrote CMIP7 file: {cmip7_path}")
else:
logger.debug(f"CMIP7 file already exists: {cmip7_path}")

return cmip7_path, dict(ds_cmip7.attrs)

except Exception as e:
logger.warning(f"Failed to convert {cmip6_path}: {e}")
return None
finally:
ds.close()


class CMIP7Request:
"""
CMIP7 dataset request that wraps CMIP6 data with conversion.

Fetches CMIP6 data from ESGF and converts to CMIP7 format.
Converted files are written to disk with CMIP7 directory structure.

Parameters
----------
slug
Unique identifier for this request
facets
ESGF search facets (e.g., source_id, variable_id, experiment_id)
output_dir
Directory to write converted CMIP7 files.
Defaults to ~/.cache/climate-ref/cmip7
remove_ensembles
If True, keep only one ensemble member per model
time_span
Optional time range filter (start, end) in YYYY-MM format
rename_variables
If True, rename variables to CMIP7 branded format (e.g., tas -> tas_tavg-h2m-hxy-u)

Examples
--------
>>> from climate_ref_core.esgf import CMIP7Request
>>> request = CMIP7Request(
... slug="tas-historical",
... facets={
... "source_id": "ACCESS-ESM1-5",
... "experiment_id": "historical",
... "variable_id": "tas",
... "member_id": "r1i1p1f1",
... "table_id": "Amon",
... },
... time_span=("2000-01", "2014-12"),
... )
>>> df = request.fetch_datasets()
"""

source_type = "CMIP7"

def __init__( # noqa: PLR0913
self,
slug: str,
facets: dict[str, Any],
output_dir: Path | None = None,
remove_ensembles: bool = False,
time_span: tuple[str, str] | None = None,
rename_variables: bool = False,
):
self.slug = slug
self.facets = facets
self.output_dir = output_dir or Path.home() / ".cache" / "climate-ref" / "cmip7"
self.rename_variables = rename_variables
self.time_span = time_span

# Internal CMIP6 request for actual fetching
self._cmip6_request = CMIP6Request(
slug=f"{slug}_cmip6_source",
facets=facets,
remove_ensembles=remove_ensembles,
time_span=time_span,
)

def __repr__(self) -> str:
return f"CMIP7Request(slug={self.slug!r}, facets={self.facets!r})"

def fetch_datasets(self) -> pd.DataFrame:
"""
Fetch CMIP6 data from ESGF and convert to CMIP7 format.

Returns
-------
pd.DataFrame
DataFrame containing CMIP7 metadata and file paths.
Contains columns: key, files, path, source_type, and all CMIP7 metadata fields.
"""
# Fetch CMIP6 data
cmip6_df = self._cmip6_request.fetch_datasets()

if cmip6_df.empty:
logger.warning(f"No CMIP6 datasets found for {self.slug}")
return cmip6_df

converted_rows = []

for _, row in cmip6_df.iterrows():
files_list = row.get("files", [])
if not files_list:
continue

cmip7_files = []
cmip7_attrs: dict[str, Any] | None = None

for _file_path in files_list:
# Convert each CMIP6 file to CMIP7-like
file_path = Path(_file_path)
if not file_path.exists():
logger.warning(f"CMIP6 file does not exist: {file_path}")
continue

result = _convert_file(file_path, self.output_dir, self.rename_variables)
if result is not None:
cmip7_path, attrs = result
cmip7_files.append(str(cmip7_path))
# Use attributes from first successful conversion
if cmip7_attrs is None:
cmip7_attrs = attrs

if cmip7_files and cmip7_attrs is not None:
converted_row = self._build_cmip7_metadata(row, cmip7_files, cmip7_attrs)
converted_rows.append(converted_row)

if not converted_rows:
logger.warning(f"No CMIP7 files converted for {self.slug}")
return pd.DataFrame()

return pd.DataFrame(converted_rows)

def _build_cmip7_metadata(
self,
cmip6_row: pd.Series,
cmip7_files: list[str],
cmip7_attrs: dict[str, Any],
) -> dict[str, Any]:
"""
Build CMIP7 metadata row from CMIP6 source and converted attributes.

Based on CMIP7 Global Attributes V1.0 (DOI: 10.5281/zenodo.17250297).

Parameters
----------
cmip6_row
Original CMIP6 metadata row
cmip7_files
List of converted CMIP7 file paths
cmip7_attrs
Attributes from the converted CMIP7 dataset

Returns
-------
dict[str, Any]
CMIP7 metadata dictionary
"""
table_id = cmip6_row.get("table_id", "Amon")
variable_id = cmip6_row.get("variable_id", "tas")
branding_suffix = get_branding_suffix(variable_id)
branding_suffix_str = str(branding_suffix)

# Start with CMIP6 row as base
result: dict[str, Any] = cast(dict[str, Any], cmip6_row.to_dict())

# Remove CMIP6-only fields
for field in ["member_id", "table_id", "grid", "sub_experiment", "sub_experiment_id"]:
result.pop(field, None)

# Override with CMIP7-specific values per V1.0 spec
result.update(
{
"path": cmip7_files[0], # Primary path
"files": cmip7_files,
# Mandatory CMIP7 fields
"mip_era": "CMIP7",
"realm": get_realm_from_table(table_id),
"frequency": get_frequency_from_table(table_id),
"region": cmip7_attrs.get("region", "glb"), # lowercase per spec
"branding_suffix": branding_suffix_str,
"branded_variable": f"{variable_id}_{branding_suffix_str}",
"drs_specs": cmip7_attrs.get("drs_specs", "MIP-DRS7"),
"data_specs_version": cmip7_attrs.get("data_specs_version", "MIP-DS7.1.0.0"),
"product": cmip7_attrs.get("product", "model-output"),
"license_id": cmip7_attrs.get("license_id", "CC-BY-4.0"),
# Branding suffix components
"temporal_label": cmip7_attrs.get("temporal_label", branding_suffix.temporal_label),
"vertical_label": cmip7_attrs.get("vertical_label", branding_suffix.vertical_label),
"horizontal_label": cmip7_attrs.get("horizontal_label", branding_suffix.horizontal_label),
"area_label": cmip7_attrs.get("area_label", branding_suffix.area_label),
# Variant indices (convert from CMIP6 integers to CMIP7 strings)
"realization_index": cmip7_attrs.get("realization_index", ""),
"initialization_index": cmip7_attrs.get("initialization_index", ""),
"physics_index": cmip7_attrs.get("physics_index", ""),
"forcing_index": cmip7_attrs.get("forcing_index", ""),
# Optional fields
"nominal_resolution": cmip7_attrs.get("nominal_resolution", ""),
"tracking_id": cmip7_attrs.get("tracking_id", ""),
# Parent fields
"branch_time_in_child": cmip7_attrs.get("branch_time_in_child"),
"branch_time_in_parent": cmip7_attrs.get("branch_time_in_parent"),
"parent_activity_id": cmip7_attrs.get("parent_activity_id", ""),
"parent_experiment_id": cmip7_attrs.get("parent_experiment_id", ""),
"parent_mip_era": cmip7_attrs.get("parent_mip_era", ""),
"parent_source_id": cmip7_attrs.get("parent_source_id", ""),
"parent_time_units": cmip7_attrs.get("parent_time_units", ""),
"parent_variant_label": cmip7_attrs.get("parent_variant_label", ""),
"external_variables": cmip7_attrs.get("external_variables", ""),
}
)

# Generate CMIP7 instance_id
result["key"] = create_cmip7_instance_id(result)

return result
Loading