Skip to content

Conversation

@enssow
Copy link
Contributor

@enssow enssow commented Dec 11, 2025

Description

Draft PR to showcase how to enable sharding (experimentation underway to optimise performance)

Issue Number

DRAFT
Closes #1384

Checklist before asking for review

  • I have performed a self-review of my code
  • My changes comply with basic sanity checks:
    • I have fixed formatting issues with ./scripts/actions.sh lint
    • I have run unit tests with ./scripts/actions.sh unit-test
    • I have documented my code and I have updated the docstrings.
    • I have added unit tests, if relevant
  • I have tried my changes with data and code:
    • I have run the integration tests with ./scripts/actions.sh integration-test
    • (bigger changes) I have run a full training and I have written in the comment the run_id(s): launch-slurm.py --time 60
    • (bigger changes and experiments) I have shared a hegdedoc in the github issue with all the configurations and runs for this experiments
  • I have informed and aligned with people impacted by my change:
    • for config changes: the MatterMost channels and/or a design doc
    • for changes of dependencies: the MatterMost software development channel

@github-actions github-actions bot added infra Issues related to infrastructure performance Work related to performance improvements labels Dec 11, 2025
@enssow enssow marked this pull request as draft December 11, 2025 14:22
@enssow enssow mentioned this pull request Dec 11, 2025
6 tasks
@enssow
Copy link
Contributor Author

enssow commented Dec 19, 2025

  • Specify type of store to create during an inference with uv run --offline inference --from_run_id zs581tqh --samples 1 --streams_output ERA5 --zarr-store local or uv run --offline inference --from_run_id zs581tqh --samples 1 --streams_output ERA5 --zarr-store zip
  • Code to run evaluation remains the same with no extra sections of config required (the type of store is detected using the extension) e.g. uv run evaluate --config ../test_evaluate_zip.yml (I tested the latest code with runs: v6l27pog (zip) and bvu0y897 (local)
  • Backwards Compatible: /p/project1/weatherai/owens1/WeatherGenerator/.venv/lib/python3.12/site-packages/zarr/core/group.py:568: ZarrUserWarning: Both zarr.json (Zarr format 3) and .zgroup (Zarr format 2) metadata objects exist at file:///p/scratch/weatherai/shared_work/results/v8pqzh4y/validation_chkpt00000_rank0000.zarr. Zarr format 3 will be used. warnings.warn(msg, category=ZarrUserWarning, stacklevel=1) get this warning but it does plot (tested with run v8pqzh4y)

@enssow
Copy link
Contributor Author

enssow commented Dec 19, 2025

Integration test fails, need to investigate this further!

@clessig
Copy link
Collaborator

clessig commented Dec 20, 2025

Latest version can read and write with the zipstore format but needs tidying up (produces lots of print statements to narrow down the issues when trying to run the evaluation from a .yml file)

What is the performance impact for running evaluation with using zipstore?

Copy link
Collaborator

@tjhunter tjhunter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small style comments. Otherwise, it looks ready to try out on a larger scale.

When testing into the logic, I moved the creation of the zarr store straight into inference. Otherwise, I am personally confused when we write during inference and when we just calculate validation losses.

@grassesi : do you know why we don't write data when calling validation during training?

"""Convert raw dask arrays into chunked dask-aware xarray dataset."""
chunks = (chunk_nsamples, *self.data.shape[1:])

chunks = (chunk_nsamples, *(max(x // 4, 1) for x in self.data.shape[1:]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this formula coming from? Someone else should be able to update it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted out into a scale factor parameter now for easier updating

self.data_root: zarr.Group | None = None
self._store: LocalStore | None = None
# determine type using extension
ext = self._store_path.suffix[1:]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are repeating yourself

if self.type == "zip":
if self.create:
_logger.info("Creating zipstore")
self._store = ZipStore(self._store_path, mode="a")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append is more permissive, but there is a cost in opening and closing the store. I would rather have the store opened for the full duration of the write rather than for each chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode = "w" works with the new develop code, before I had some issues with overwriting so was using append - thanks for fixing this :)

@clessig
Copy link
Collaborator

clessig commented Dec 29, 2025

Some small style comments. Otherwise, it looks ready to try out on a larger scale.

When testing into the logic, I moved the creation of the zarr store straight into inference. Otherwise, I am personally confused when we write during inference and when we just calculate validation losses.

@grassesi : do you know why we don't write data when calling validation during training?

We don't write during validation since it's slows down the validation, generates additional files/storage, and is rarely used. There's a config parameter that controls this:

log_validation: 0
(in the process of being renamed).

@github-actions github-actions bot added the initiative Large piece of work covering multiple sprint label Dec 31, 2025
@enssow
Copy link
Contributor Author

enssow commented Dec 31, 2025

Latest version can read and write with the zipstore format but needs tidying up (produces lots of print statements to narrow down the issues when trying to run the evaluation from a .yml file)

What is the performance impact for running evaluation with using zipstore?

Hi Christian, sorry I missed this before the holidays - the latest tests are all in the hedge doc: https://gitlab.jsc.fz-juelich.de/hedgedoc/3X10vtdVQumQZy4qpBiHWg - there seems to be an small imporevemnt when using the ZipStore to evaluate

@enssow enssow marked this pull request as ready for review January 2, 2026 15:22
Copy link
Contributor

@grassesi grassesi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, but I would like some structural changes: If possible trainer.Trainer should not be concerned with managing the ZarrIO context. In addition to the changes I requested below I would also like for the ZarrIO.enter method to use polymorphism instead of switching behavior based on self.type. I would also like that number of times the literals indicating file extension/store_type (there is a one-to-one mapping) is reduced (eg. "zip" now occurs in 7 different places all over the code, similarly with "zarr"/"local"). I have implemented these two points in #1553 , feel free to just merge it if you like it.

Comment on lines +470 to +472
if SHARDING_ENABLED and chunks != "auto":
shards = (SHARD_N_SAMPLES, *((SCALE_FACTOR + 1) * x for x in chunks[1:]))
group.create_array(name, data=array, chunks=chunks, shards=shards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if you can encapsulate this conditional:

shards = _get_shards(chunks)
group.create_array(name, data=array, chunks=chunks, shards = shards)

def _get_shards(tuple[int]) -> dict[str, Any] | None:
   ...

Comment on lines 226 to +229
)
stream_dict = reader.get_stream(stream)
if not stream_dict:
return run_id, stream, {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required for the new zarr zip store? I am also surprised that this is needed. From the code I would have thought that reader.get_stream always returns at least an empty dict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was inherited from an older merge w/develop. I'll remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this file from your PR, since you did not make any real changes to pyproject.toml

Comment on lines +51 to 52
zarr_store=args.zarr_store,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on utils/cli.py. I think it should be a proper option instead (with a default value in default_config.yaml etc.)

Comment on lines +477 to +478
elapsed = timeit.default_timer() - start_time
_logger.debug(f"writing array: {name} took {elapsed:.2f}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we want that timing logic in develop.

Comment on lines -253 to +282
def as_xarray(self, chunk_nsamples=CHUNK_N_SAMPLES) -> xr.DataArray:
def as_xarray(
self, chunk_nsamples=CHUNK_N_SAMPLES, shard_nsamples=SHARD_N_SAMPLES
) -> xr.DataArray:
"""Convert raw dask arrays into chunked dask-aware xarray dataset."""
chunks = (chunk_nsamples, *self.data.shape[1:])

chunks = (chunk_nsamples, *(max(x // SCALE_FACTOR, 1) for x in self.data.shape[1:]))
if SHARDING_ENABLED:
shards = (shard_nsamples, *((SCALE_FACTOR + 1) * x for x in chunks[1:]))
_logger.info(f"sharding enabled with shards: {shards} and chunks: {chunks}")
else:
shards = None
_logger.info(f"sharding disabled, using chunks: {chunks}")
# maybe do dask conversion earlier? => usefull for parallel writing?
data = da.from_zarr(self.data, chunks=chunks) # dont call compute to lazy load
data = da.from_zarr(
self.data, chunks=chunks, shards=shards
) # dont call compute to lazy load
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good :)

Comment on lines 378 to 380
# warnings.warn(f"Zarr2 conflict: {last_msg}",
# DeprecationWarning
# )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove if not important

Comment on lines 365 to 368
with warnings.catch_warnings(record=True) as caught:
self._store = LocalStore(self._store_path)
self.data_root = zarr.group(store=self._store)
# Raise DeprecationWarning only if a ZarrUserWarning was raised
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the cause of this warnings? Maybe add a short comment why we want to ignore it here.

@github-project-automation github-project-automation bot moved this to In Progress in WeatherGen-dev Jan 6, 2026
* make zarrio subclasses

* store string literals for output storage in enum.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

infra Issues related to infrastructure initiative Large piece of work covering multiple sprint performance Work related to performance improvements

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

zarr3 compaction

5 participants