Skip to content

Add nats-key-value package#844

Open
caspervonb wants to merge 11 commits intomainfrom
add-key-value-package
Open

Add nats-key-value package#844
caspervonb wants to merge 11 commits intomainfrom
add-key-value-package

Conversation

@caspervonb
Copy link
Copy Markdown
Collaborator

No description provided.

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer bot commented Mar 26, 2026

Claude finished @caspervonb's task in 4m 13s —— View job


PR Review: Add nats-key-value package

  • Gather context and read PR diff
  • Review nats-key-value/src/nats/kv/__init__.py
  • Review nats-key-value/src/nats/kv/errors.py
  • Review nats-key-value/tests/test_kv.py
  • Review nats-key-value/pyproject.toml
  • Post inline comments on issues found
  • Post overall review summary

Good overall structure — the package layout, dataclasses, and test coverage modelled on Go's kv_test.go are solid. Four correctness/resource issues found:

Bugs / correctness:

  1. Dead code in watch() (line 541–544) — both branches of the if len(keys) == 1 and keys == ">" block set consumer_config["filter_subject"] to the same value. The condition is a no-op. If multi-key or multi-subject filtering was intended, the logic needs to be completed.

  2. Silent exception swallowing in _get() (line 294) — except Exception: pass during header decoding means a malformed or unexpected header silently treats the message as a PUT. A DELETE/PURGE marker could be misread as a live value.

  3. String-based error detection for BucketExistsError (line 726) — "already in use" in str(e).lower() is fragile. Every other CAS check in this file uses e.error_code; this should use the stream-already-exists error code (10058) for consistency and reliability.

Resource management:

  1. Missing try/finally in purge_deletes, keys, history — if an exception is raised inside the async for loop, watcher.stop() is never called and the ephemeral consumer leaks. All three methods need try/finally blocks.

  2. KeyWatcher lacks async context manager support — without __aenter__/__aexit__, callers who forget to call stop() silently leak consumers (server cleans them up after 5 minutes via inactive_threshold). Adding the protocol makes resource cleanup idiomatic.

if entry.operation in (KeyValueOp.DELETE, KeyValueOp.PURGE):
delete_markers.append(entry)

await watcher.stop()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watcher.stop() is not called if an exception is raised during the async for loop (e.g. from _js._api.stream_purge). The consumer subscription leaks. Same pattern applies in keys() (line ~665) and history() (line ~692). Use try/finally:

watcher = await self.watch_all()
try:
    async for entry in watcher:
        ...
finally:
    await watcher.stop()

@@ -0,0 +1,846 @@
"""NATS Key-Value Store package."""
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer nats.key_value

@caspervonb caspervonb marked this pull request as ready for review April 14, 2026 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant