Skip to content

Commit 97706bb

Browse files
authored
Preprocess/era5 land (#78)
* initial commit * flake8 error * update the final test * add variable parameter to process model * add docstring to process() * add variable to base * update merge files * update merge files * update era5_land tests to work with variable names * add year functionality to preprocessor * Update preprocess.py * add type ignore to fix mypy errors * fix tests because merged file doesn't have variable name * update black * update
1 parent 90e7b43 commit 97706bb

4 files changed

Lines changed: 303 additions & 0 deletions

File tree

scripts/preprocess.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
ERA5HourlyPreprocessor,
1414
BokuNDVIPreprocessor,
1515
KenyaASALMask,
16+
ERA5LandPreprocessor,
1617
)
1718

1819
from src.preprocess.admin_boundaries import KenyaAdminPreprocessor
@@ -54,6 +55,25 @@ def process_era5POS_2018():
5455
)
5556

5657

58+
def process_era5_land(variable: str):
59+
if Path(".").absolute().as_posix().split("/")[-1] == "ml_drought":
60+
data_path = Path("data")
61+
else:
62+
data_path = Path("../data")
63+
regrid_path = data_path / "interim/chirps_preprocessed/chirps_kenya.nc"
64+
assert regrid_path.exists(), f"{regrid_path} not available"
65+
66+
processor = ERA5LandPreprocessor(data_path)
67+
68+
processor.preprocess(
69+
subset_str="kenya",
70+
regrid=None,
71+
resample_time="M",
72+
upsampling=False,
73+
variable=variable,
74+
)
75+
76+
5777
def process_gleam():
5878
data_path = get_data_path()
5979

src/preprocess/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
from .chirps import CHIRPSPreprocessor
33
from .planetOS import PlanetOSPreprocessor
44
from .gleam import GLEAMPreprocessor
5+
from .era5_land import ERA5LandPreprocessor
56
from .seas5 import S5Preprocessor
67
from .era5 import ERA5MonthlyMeanPreprocessor, ERA5HourlyPreprocessor
78
from .esa_cci import ESACCIPreprocessor
89
from .srtm import SRTMPreprocessor
910
from .admin_boundaries import KenyaAdminPreprocessor, KenyaASALMask
1011
from .boku_ndvi import BokuNDVIPreprocessor
1112

13+
1214
__all__ = [
1315
"VHIPreprocessor",
1416
"CHIRPSPreprocessor",
@@ -22,4 +24,5 @@
2224
"KenyaAdminPreprocessor",
2325
"BokuNDVIPreprocessor",
2426
"KenyaASALMask",
27+
"ERA5LandPreprocessor",
2528
]

src/preprocess/era5_land.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
from pathlib import Path
2+
import xarray as xr
3+
import multiprocessing
4+
from functools import partial
5+
from typing import Optional, List
6+
from shutil import rmtree
7+
8+
from .base import BasePreProcessor
9+
10+
11+
class ERA5LandPreprocessor(BasePreProcessor):
12+
""" Preprocesses the ERA5 Land data """
13+
14+
dataset = "reanalysis-era5-land"
15+
16+
@staticmethod
17+
def create_filename(
18+
netcdf_filepath: Path, subset_name: Optional[str] = None
19+
) -> str:
20+
21+
var_name = netcdf_filepath.parts[-3]
22+
months = netcdf_filepath.parts[-1][:-3]
23+
year = netcdf_filepath.parts[-2]
24+
25+
stem = f"{year}_{months}_{var_name}"
26+
if subset_name is not None:
27+
stem = f"{stem}_{subset_name}"
28+
return f"{stem}.nc"
29+
30+
def _preprocess_single(
31+
self,
32+
netcdf_filepath: Path,
33+
subset_str: Optional[str] = "kenya",
34+
regrid: Optional[xr.Dataset] = None,
35+
) -> None:
36+
""" Preprocess a single netcdf file (run in parallel if
37+
`parallel_processes` arg > 1)
38+
39+
Process:
40+
-------
41+
* rename latitude/longitude -> lat/lon
42+
* chop region of interset (ROI)
43+
* regrid to same spatial grid as a reference dataset (`regrid`)
44+
* Save the output file to new folder / filename
45+
46+
Todo:
47+
# read the variable name from the fpath
48+
# variable = netcdf_filepath.parents[1].name
49+
"""
50+
print(f"Processing {netcdf_filepath.name}")
51+
52+
# 1. read in the dataset
53+
ds = xr.open_dataset(netcdf_filepath).rename(
54+
{"longitude": "lon", "latitude": "lat"}
55+
)
56+
57+
# 2. chop out EastAfrica
58+
if subset_str is not None:
59+
ds = self.chop_roi(ds, subset_str, inverse_lat=True)
60+
61+
if regrid is not None:
62+
ds = self.regrid(ds, regrid)
63+
64+
filename = self.create_filename(
65+
netcdf_filepath, subset_name=subset_str if subset_str is not None else None
66+
)
67+
print(f"Saving to {self.interim}/{filename}")
68+
ds.to_netcdf(self.interim / filename)
69+
70+
print(f"Done for ERA5-Land {netcdf_filepath.name}")
71+
72+
def preprocess(
73+
self,
74+
subset_str: Optional[str] = "kenya",
75+
regrid: Optional[Path] = None,
76+
resample_time: Optional[str] = "M",
77+
upsampling: bool = False,
78+
parallel_processes: int = 1,
79+
variable: Optional[str] = None,
80+
years: Optional[List[int]] = None,
81+
cleanup: bool = True,
82+
) -> None:
83+
"""Preprocess all of the ERA5-Land .nc files to produce
84+
one subset file.
85+
Arguments
86+
----------
87+
:param: subset_str: Optional[str] = 'kenya'
88+
Whether to subset Kenya when preprocessing
89+
:param: regrid: Optional[Path] = None
90+
If a Path is passed, the CHIRPS files will be regridded to have the same
91+
grid as the dataset at that Path. If None, no regridding happens
92+
:param: resample_time: str = 'M'
93+
If not None, defines the time length to which the data will be resampled
94+
:param: upsampling: bool = False
95+
If true, tells the class the time-sampling will be upsampling. In this case,
96+
nearest instead of mean is used for the resampling
97+
:param: variable: Optional[str] = None
98+
the variable that you want to preprocess. If None then will
99+
process ALL variables that have been downloaded to the
100+
`data/raw/reanalysis-era5-land` by the ERA5LandExporter
101+
:param: parallel_processes: int = 1
102+
If > 1, run the preprocessing in parallel
103+
:param: years: Optional[List[int]] = None
104+
preprocess a subset of the years from the raw data
105+
:param: cleanup: bool = True
106+
If true, delete interim files created by the class
107+
108+
Note:
109+
----
110+
- the raw data is downloaded at annual resolution by default
111+
"""
112+
print(f"Reading data from {self.raw_folder}. Writing to {self.interim}")
113+
nc_files = self.get_filepaths()
114+
if years is not None:
115+
nc_files = [
116+
f for f in nc_files if int(f.parents[0]) in years # type: ignore
117+
]
118+
119+
# run for one variable or all variables?
120+
if variable is not None:
121+
variables = [d.name for d in (self.raw_folder / self.dataset).iterdir()]
122+
assert variable in variables, (
123+
"Expect the variable provided" f"to be in {variables}"
124+
)
125+
print(f"Running preprocessor for var: {variable}")
126+
nc_files = [f for f in nc_files if f.parents[1].name == variable]
127+
128+
if regrid is not None:
129+
regrid = self.load_reference_grid(regrid)
130+
131+
# parallel processing ?
132+
if parallel_processes <= 1: # sequential
133+
for file in nc_files:
134+
self._preprocess_single(file, subset_str, regrid)
135+
else:
136+
pool = multiprocessing.Pool(processes=parallel_processes)
137+
outputs = pool.map(
138+
partial(self._preprocess_single, subset_str=subset_str, regrid=regrid),
139+
nc_files,
140+
)
141+
print("\nOutputs (errors):\n\t", outputs)
142+
143+
# merge and resample files
144+
self.merge_files(
145+
subset_str=subset_str,
146+
resample_time=resample_time,
147+
upsampling=upsampling,
148+
# variable=variable,
149+
)
150+
151+
if cleanup:
152+
rmtree(self.interim)

tests/preprocess/test_era5_land.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import xarray as xr
2+
import numpy as np
3+
from datetime import datetime
4+
from pathlib import Path
5+
6+
from src.preprocess import ERA5LandPreprocessor
7+
from src.utils import get_kenya
8+
9+
from ..utils import _make_dataset
10+
11+
12+
class TestERA5LandPreprocessor:
13+
@staticmethod
14+
def _make_era5_dataset(
15+
size, lonmin=33.75, lonmax=42.25, latmin=6.0, latmax=-5.0, add_times=True
16+
):
17+
# Same as make_chirps_dataset, except already truncated
18+
# since we can just download Kenya from the cds api
19+
lat_len, lon_len = size
20+
# create the vector
21+
longitudes = np.linspace(lonmin, lonmax, lon_len)
22+
latitudes = np.linspace(latmin, latmax, lat_len)
23+
24+
dims = ["longitude", "latitude"]
25+
coords = {"latitude": latitudes, "longitude": longitudes}
26+
27+
if add_times:
28+
size = (2, size[0], size[1])
29+
dims.insert(0, "time")
30+
coords["time"] = [datetime(2019, 1, 1), datetime(2019, 1, 2)]
31+
t2m = np.random.randint(100, size=size)
32+
33+
return xr.Dataset({"t2m": (dims, t2m)}, coords=coords)
34+
35+
def test_init(self, tmp_path):
36+
37+
ERA5LandPreprocessor(tmp_path)
38+
39+
assert (tmp_path / "interim/reanalysis-era5-land_interim").exists()
40+
assert (tmp_path / "interim/reanalysis-era5-land_preprocessed").exists()
41+
42+
@staticmethod
43+
def test_make_filename():
44+
path = Path("reanalysis-era5-land" "/2m_temperature/1979_2019/01_12.nc")
45+
46+
name = ERA5LandPreprocessor.create_filename(path, "kenya")
47+
expected_name = "1979_2019_01_12_2m_temperature_kenya.nc"
48+
assert name == expected_name, f"{name} generated, expected {expected_name}"
49+
50+
@staticmethod
51+
def test_get_filenames(tmp_path):
52+
(tmp_path / "raw/reanalysis-era5-land/" "2m_temperature/1979_2019").mkdir(
53+
parents=True
54+
)
55+
56+
test_file = (
57+
tmp_path / "raw/reanalysis-era5-land" "/2m_temperature/1979_2019.01_12.nc"
58+
)
59+
test_file.touch()
60+
61+
processor = ERA5LandPreprocessor(tmp_path)
62+
63+
files = processor.get_filepaths()
64+
assert files[0] == test_file, f"Expected {test_file} to be retrieved"
65+
66+
def test_preprocess(self, tmp_path):
67+
68+
(tmp_path / "raw/reanalysis-era5-land/" "2m_temperature/1979_2019").mkdir(
69+
parents=True
70+
)
71+
data_path = (
72+
tmp_path / "raw/reanalysis-era5-land/" "2m_temperature/1979_2019/01_12.nc"
73+
)
74+
dataset = self._make_era5_dataset(size=(100, 100))
75+
dataset.to_netcdf(path=data_path)
76+
77+
kenya = get_kenya()
78+
regrid_dataset, _, _ = _make_dataset(
79+
size=(20, 20),
80+
latmin=kenya.latmin,
81+
latmax=kenya.latmax,
82+
lonmin=kenya.lonmin,
83+
lonmax=kenya.lonmax,
84+
)
85+
86+
regrid_path = tmp_path / "regridder.nc"
87+
regrid_dataset.to_netcdf(regrid_path)
88+
89+
processor = ERA5LandPreprocessor(tmp_path)
90+
processor.preprocess(
91+
subset_str="kenya",
92+
regrid=regrid_path,
93+
parallel_processes=1,
94+
variable="2m_temperature",
95+
)
96+
97+
expected_out_path = (
98+
tmp_path / "interim/reanalysis-era5"
99+
"-land_preprocessed/reanalysis-era5-land_kenya.nc"
100+
)
101+
assert (
102+
expected_out_path.exists()
103+
), f"Expected processed file to be saved to {expected_out_path}"
104+
105+
# check the subsetting happened correctly
106+
out_data = xr.open_dataset(expected_out_path)
107+
expected_dims = ["lat", "lon", "time"]
108+
assert len(list(out_data.dims)) == len(expected_dims)
109+
for dim in expected_dims:
110+
assert dim in list(
111+
out_data.dims
112+
), f"Expected {dim} to be in the processed dataset dims"
113+
114+
lons = out_data.lon.values
115+
assert (lons.min() >= kenya.lonmin) and (
116+
lons.max() <= kenya.lonmax
117+
), "Longitudes not correctly subset"
118+
119+
lats = out_data.lat.values
120+
assert (lats.min() >= kenya.latmin) and (
121+
lats.max() <= kenya.latmax
122+
), "Latitudes not correctly subset"
123+
124+
assert out_data.t2m.values.shape[1:] == (20, 20)
125+
126+
assert (
127+
not processor.interim.exists()
128+
), f"Interim era5 folder should have been deleted"

0 commit comments

Comments
 (0)