-
Notifications
You must be signed in to change notification settings - Fork 210
refactor: add shared pubsub test fixtures and wait_for polling helper #1298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| """Shared fixtures and helpers for pubsub tests.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import AsyncIterator | ||
| from contextlib import asynccontextmanager | ||
| import dataclasses | ||
| from typing import Any | ||
|
|
||
| import pytest | ||
| import trio | ||
|
|
||
| from libp2p.abc import IHost | ||
| from libp2p.pubsub.gossipsub import GossipSub | ||
| from libp2p.pubsub.pubsub import Pubsub | ||
| from tests.utils.factories import PubsubFactory | ||
| from tests.utils.pubsub.utils import dense_connect | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True, slots=True) | ||
| class GossipSubHarness: | ||
| """Typed wrapper around a batch of GossipSub-backed pubsub instances.""" | ||
|
|
||
| pubsubs: tuple[Pubsub, ...] | ||
|
|
||
| @property | ||
| def hosts(self) -> tuple[IHost, ...]: | ||
| return tuple(ps.host for ps in self.pubsubs) | ||
|
|
||
| @property | ||
| def routers(self) -> tuple[GossipSub, ...]: | ||
| result: list[GossipSub] = [] | ||
| for ps in self.pubsubs: | ||
| r = ps.router | ||
| assert isinstance(r, GossipSub), f"Expected GossipSub, got {type(r)}" | ||
| result.append(r) | ||
| return tuple(result) | ||
|
|
||
| def __len__(self) -> int: | ||
| return len(self.pubsubs) | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def gossipsub_nodes(n: int, **kwargs: Any) -> AsyncIterator[GossipSubHarness]: | ||
| """ | ||
| Create *n* GossipSub-backed pubsub nodes wrapped in a harness. | ||
|
|
||
| Usage:: | ||
|
|
||
| async with gossipsub_nodes(3, heartbeat_interval=0.5) as h: | ||
| h.pubsubs # tuple[Pubsub, ...] | ||
| h.hosts # tuple[IHost, ...] | ||
| h.routers # tuple[GossipSub, ...] | ||
| """ | ||
| async with PubsubFactory.create_batch_with_gossipsub(n, **kwargs) as pubsubs: | ||
| yield GossipSubHarness(pubsubs=pubsubs) | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def connected_gossipsub_nodes( | ||
| n: int, **kwargs: Any | ||
| ) -> AsyncIterator[GossipSubHarness]: | ||
| """Create *n* GossipSub nodes with dense connectivity.""" | ||
| async with gossipsub_nodes(n, **kwargs) as harness: | ||
| await dense_connect(harness.hosts) | ||
| await trio.sleep(0.1) | ||
| yield harness | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def subscribed_mesh( | ||
| topic: str, n: int, *, settle_time: float = 1.0, **kwargs: Any | ||
| ) -> AsyncIterator[GossipSubHarness]: | ||
| """ | ||
| Create *n* connected GossipSub nodes all subscribed to *topic*. | ||
|
|
||
| Waits *settle_time* seconds for mesh formation before yielding. | ||
| """ | ||
| async with connected_gossipsub_nodes(n, **kwargs) as harness: | ||
| for ps in harness.pubsubs: | ||
| await ps.subscribe(topic) | ||
| await trio.sleep(settle_time) | ||
| yield harness | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| async def connected_gossipsub_pair() -> AsyncIterator[GossipSubHarness]: | ||
| """Fixture: two connected GossipSub nodes with default config.""" | ||
| async with connected_gossipsub_nodes(2) as harness: | ||
| yield harness | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| """Polling helpers for pubsub test synchronization.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Callable | ||
| import inspect | ||
| import logging | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import trio | ||
|
|
||
| if TYPE_CHECKING: | ||
| from tests.utils.pubsub.dummy_account_node import DummyAccountNode | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| async def wait_for( | ||
| predicate: Callable[[], object], | ||
| *, | ||
| timeout: float = 10.0, | ||
| poll_interval: float = 0.02, | ||
| fail_msg: str = "", | ||
| ) -> None: | ||
| """ | ||
| Poll until *predicate()* returns a truthy value, or raise ``TimeoutError``. | ||
|
|
||
| Supports both sync and async predicates. If the predicate raises an | ||
| exception it is treated as falsy; on timeout the last such exception is | ||
| chained to the ``TimeoutError``. | ||
| """ | ||
| _is_async = inspect.iscoroutinefunction(predicate) | ||
| start = trio.current_time() | ||
| last_exc: Exception | None = None | ||
|
|
||
| while True: | ||
| try: | ||
| result = (await predicate()) if _is_async else predicate() # type: ignore[misc] | ||
| if result: | ||
|
Comment on lines
+32
to
+39
|
||
| return | ||
| except Exception as exc: | ||
| last_exc = exc | ||
|
|
||
| elapsed = trio.current_time() - start | ||
| if elapsed > timeout: | ||
| msg = fail_msg or f"wait_for timed out after {elapsed:.2f}s" | ||
| err = TimeoutError(msg) | ||
| if last_exc is not None: | ||
| raise err from last_exc | ||
| raise err | ||
|
|
||
| await trio.sleep(poll_interval) | ||
|
|
||
|
|
||
| async def wait_for_convergence( | ||
| nodes: tuple[DummyAccountNode, ...], | ||
| check: Callable[[DummyAccountNode], bool], | ||
| timeout: float = 10.0, | ||
| poll_interval: float = 0.02, | ||
| log_success: bool = False, | ||
| raise_last_exception_on_timeout: bool = True, | ||
| ) -> None: | ||
| """ | ||
| Wait until all *nodes* satisfy *check*. | ||
|
|
||
| Returns as soon as convergence is reached, otherwise raises | ||
| ``TimeoutError`` (or ``AssertionError`` when | ||
| *raise_last_exception_on_timeout* is ``True`` and a node raised). | ||
|
|
||
| Preserves the API of the original inline helper from | ||
| ``test_dummyaccount_demo.py``. | ||
| """ | ||
| start_time = trio.current_time() | ||
|
|
||
| last_exception: Exception | None = None | ||
| last_exception_node: int | None = None | ||
|
|
||
| while True: | ||
| failed_indices: list[int] = [] | ||
| for i, node in enumerate(nodes): | ||
| try: | ||
| ok = check(node) | ||
| except Exception as exc: | ||
| ok = False | ||
| last_exception = exc | ||
| last_exception_node = i | ||
| if not ok: | ||
| failed_indices.append(i) | ||
|
|
||
| if not failed_indices: | ||
| elapsed = trio.current_time() - start_time | ||
| if log_success: | ||
| logger.debug("Converged in %.3fs with %d nodes", elapsed, len(nodes)) | ||
| return | ||
|
|
||
| elapsed = trio.current_time() - start_time | ||
| if elapsed > timeout: | ||
| if raise_last_exception_on_timeout and last_exception is not None: | ||
| node_hint = ( | ||
| f" (node index {last_exception_node})" | ||
| if last_exception_node is not None | ||
| else "" | ||
| ) | ||
| raise AssertionError( | ||
| f"Convergence failed{node_hint}: {last_exception}" | ||
| ) from last_exception | ||
|
|
||
| raise TimeoutError( | ||
| f"Convergence timeout after {elapsed:.2f}s. " | ||
| f"Failed nodes: {failed_indices}. " | ||
| f"(Hint: run with -s and pass log_success=True for timing logs)" | ||
| ) | ||
|
|
||
| await trio.sleep(poll_interval) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connected_gossipsub_nodes()uses a fixedtrio.sleep(0.1)afterdense_connect(). This makes the fixture timing-dependent and can still be flaky (or unnecessarily slow) since pubsub peer registration is asynchronous; the codebase already has event-based helpers likePubsub.wait_for_peer()intended to replace arbitrary sleeps. Suggest replacing the fixed sleep with a deterministic wait (e.g., wait until each pubsub observes at least one peer whenn > 1, or otherwise expose a configurable settle/wait strategy) so the fixture actually guarantees “connected” semantics.