diff --git a/.gitignore b/.gitignore index 525f5696d..592c540b7 100644 --- a/.gitignore +++ b/.gitignore @@ -196,6 +196,7 @@ _build/ # Attack simulation test results tests/security/attack_simulation/results/ libp2p-forge +libp2p-metrics # OSO health report generated outputs reports/*.json diff --git a/docs/examples.metrics.rst b/docs/examples.metrics.rst new file mode 100644 index 000000000..b509406a9 --- /dev/null +++ b/docs/examples.metrics.rst @@ -0,0 +1,183 @@ +Metrics Demo +============ + +This example demonstrates how to run multiple libp2p services (Ping, Pubsub/Gossipsub, Kad-DHT) in a single node and observer +their behaviour through Prometheus + Grafana metrics dashboards. + +.. code-block:: console + + $ python -m pip install libp2p + Collecting libp2p + ... + Successfully installed libp2p-x.x.x + + $ metrics-demo + Host multiaddr: /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL + Gossipsub and Pubsub services started !! + DHT service started with DHTMode.SERVER mode + Starting command executor loop... + + Prometheus metrics visible at: http://localhost:8000 + + To start prometheus and grafana dashboards, from another terminal: + PROMETHEUS_PORT=9001 GRAFANA_PORT=7001 docker compose up + + After this: + Prometheus dashboard will be visible at: http://localhost:9001 + Grafana dashboard will be visible at: http://localhost:7001 + + Entering intractive mode, type commands below. + + Available commands: + - connect - Connect to another peer + ... + +Now in this way a node can be started, now start another node in a different terminal +and make a connection between so that they can communicate: + +.. code-block:: console + + $ metrics-demo + $ connect /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL + Connected to 12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL + +Now we can communicate between the 2 nodes via Ping, Gossipsub and Kad-DHT. Before that we have to +start the prometheus and grafana dashboards. For this create a `docker-compose.yml` file like this: + +.. code-block:: console + + services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "${PROMETHEUS_PORT}:9090" + extra_hosts: + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "${GRAFANA_PORT}:3000" + depends_on: + - prometheus + +And a `prometheus.yml` file like this: + +.. code-block:: console + + global: + scrape_interval: 5s + + scrape_configs: + - job_name: "libp2p-python" + static_configs: + - targets: + - "host.docker.internal:8000" + + +And run it like this +.. code-block:: console + + PROMETHEUS_PORT=9001 GRAFANA_PORT=7001 docker compose up + +A similar file is present in `py-libp2p/libp2p/metrics` directory also, so either create a new docker-compose +file or run it from the above path. This basically starts a prometheus and grafana server in your localhost, +with which the metrics can be viewed in graph format. +Remember that the dashboards will be created for the node, whose prometheus metric-server is running on port `8000`. + +Now see how to communicate between the 2 nodes, via Pubsub/Gossipsub, Ping and Kad-DHT + +PING +==== + +The following metrics are exposed in this service: +- ping: Round-trip time sending a `ping` and receiving a `pong` +- ping_failure: Failure while sending a ping or receiving a ping + +.. code-block:: console + + $ ping /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL 15 + [401, 419, 428, 353, 354, 353, 369, 371, 353, 380, 352, 343, 378, 324, 412] + +The output will the rtts took for each ping/ping to complete. +The updated metrics can be visualized in the dashboards. + +Pubsub/Gossipsub +================ + +The following metrics are exposed in this service: +- gossipsub_received_total: Messages successfully received +- gossipsub_publish_total: Messages to be published +- gossipsub_subopts_total: Messages notifying peer subscriptions +- gossipsub_control_total: Received control messages +- gossipsub_message_bytes: Message size in bytes + +To communicate via gossipsub, join the same topics on both the nodes and publish messages +on that topic to get it received on both sides. + +.. code-block:: console + + $ join pubsub-chat + Subscribed to pubsub-chat + Starting receive loop + +Do this on both the terminals. Then publish a message from one side, and see it recieved on the other side. + +.. code-block:: console + + $ publish pubsub-chat hello-from-pubsub! + +See the updated metrics in the dashboards. + +KAD-DHT +======= + +The following metrics are exposed in this service: +- kad_inbound_total: Total inbound requests received +- kad_inbound_find_node: Total inbound FIND_NODE requests received +- kad_inbound_get_value: Total inbound GET_VALUE requests received +- kad_inbound_put_value: Total inbound PUT_VALUE requests received +- kad_inbound_get_providers: Total inbound GET_PROVIDERS requests received +- kad_inbound_add_provider: Total inbound ADD_PROVIDER requests received + +To intercat between the 2 nodes via kad-dht, we have 2 ways: +- `PUT_VAUE` in one node, and `GET_VALUE` in another +- `ADD_PROVIDER` in one node, and `GET_PROVIDERS` in another + +.. code-block:: console + + $ put /exp/fa kad-dht-value + Stored value: kad-dht-value with key: /exp/fa + + # From another terminal + $ get /exp/fa + Retrieved value: kad-dht-value + +.. code-block:: console + + $ advertize content-id + Advertised as provider for content: content-id + + # From another terminal + $ get_provider content-id + Found 1 providers: [] + +SWARM-CONNECTION-EVENTS +======================= + +Other than the above 3 services, the incoming/outgoing connection cycle is also monitored via the +following metrics: +- swarm_incoming_conn: Incoming connection received by libp2p-swarm +- swarm_incoming_conn_error: Incoming connection failure in libp2p-swarm +- swarm_dial_attempt: Dial attempts made by libp2p-swarm +- swarm_dial_attempt_error: Outgoing connection failure in libp2p-swarm + +The full source code for this example is below: + +.. literalinclude:: ../examples/metrics/runner.py + :language: python + :linenos: diff --git a/docs/examples.rst b/docs/examples.rst index ab45d54c2..4a7be1025 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -33,3 +33,4 @@ Examples examples.autotls examples.perf examples.path_handling + examples.metrics diff --git a/docs/libp2p.metrics.rst b/docs/libp2p.metrics.rst new file mode 100644 index 000000000..8a1b31797 --- /dev/null +++ b/docs/libp2p.metrics.rst @@ -0,0 +1,53 @@ +libp2p.metrics package +====================== + +Submodules +---------- + +libp2p.metrics.gossipsub module +------------------------------- + +.. automodule:: libp2p.metrics.gossipsub + :members: + :undoc-members: + :show-inheritance: + +libp2p.metrics.kad_dht module +----------------------------- + +.. automodule:: libp2p.metrics.kad_dht + :members: + :undoc-members: + :show-inheritance: + +libp2p.metrics.metrics module +----------------------------- + +.. automodule:: libp2p.metrics.metrics + :members: + :undoc-members: + :show-inheritance: + +libp2p.metrics.ping module +-------------------------- + +.. automodule:: libp2p.metrics.ping + :members: + :undoc-members: + :show-inheritance: + +libp2p.metrics.swarm module +--------------------------- + +.. automodule:: libp2p.metrics.swarm + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.metrics + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.rst b/docs/libp2p.rst index fb2ab82b0..21a0cdb3f 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -28,6 +28,7 @@ Subpackages libp2p.tools libp2p.transport libp2p.utils + libp2p.metrics Submodules ---------- diff --git a/examples/metrics/README.md b/examples/metrics/README.md new file mode 100644 index 000000000..38bcbf2b7 --- /dev/null +++ b/examples/metrics/README.md @@ -0,0 +1,167 @@ +## Metrics Demo + +This example demonstrates how to run multiple libp2p services (Ping, Pubsub/Gossipsub, Kad-DHT) in a single node and observer +their behaviour through Prometheus + Grafana metrics dashboards. + +```bash +$ python -m pip install libp2p +Collecting libp2p +... +Successfully installed libp2p-x.x.x + +$ metrics-demo +Host multiaddr: /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL +Gossipsub and Pubsub services started !! +DHT service started with DHTMode.SERVER mode +Starting command executor loop... + +Prometheus metrics visible at: http://localhost:8000 + +To start prometheus and grafana dashboards, from another terminal: +PROMETHEUS_PORT=9001 GRAFANA_PORT=7001 docker compose up + +After this: +Prometheus dashboard will be visible at: http://localhost:9001 +Grafana dashboard will be visible at: http://localhost:7001 + +Entering intractive mode, type commands below. + +Available commands: +- connect - Connect to another peer +... +``` + +Now in this way a node can be started, now start another node in a different terminal +and make a connection between so that they can communicate: + +```bash +$ metrics-demo +$ connect /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL +Connected to 12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL +``` + +Now we can communicate between the 2 nodes via Ping, Gossipsub and Kad-DHT. Before that we have to +start the prometheus and grafana dashboards. For this create a `docker-compose.yml` file like this: + +```yml + services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "${PROMETHEUS_PORT}:9090" + extra_hosts: + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "${GRAFANA_PORT}:3000" + depends_on: + - prometheus +``` + +And run it like this + +```bash +PROMETHEUS_PORT=9001 GRAFANA_PORT=7001 docker compose up +``` + +A similar file is present in `py-libp2p/libp2p/metrics` directory also, so either create a new docker-compose +file or run it from the above path. This basically starts a prometheus and grafana server in your localhost, +with which the metrics can be viewed in graph format. + +Now see how to communicate between the 2 nodes, via Pubsub/Gossipsub, Ping and Kad-DHT + +### PING + +The following metrics are exposed in this service: + +- ping: Round-trip time sending a `ping` and receiving a `pong` +- ping_failure: Failure while sending a ping or receiving a ping + +```bash +$ ping /ip4/172.16.68.73/tcp/41173/p2p/12D3KooWD2DFvDs4wekLWU8sAUJJgivbRbiiKkX9yQ3kGhuCwCqL 15 +[401, 419, 428, 353, 354, 353, 369, 371, 353, 380, 352, 343, 378, 324, 412] +``` + +The output will the rtts took for each ping/ping to complete. +The updated metrics can be visualized in the dashboards. + +### Pubsub/Gossipsub + +The following metrics are exposed in this service: + +- gossipsub_received_total: Messages successfully received +- gossipsub_publish_total: Messages to be published +- gossipsub_subopts_total: Messages notifying peer subscriptions +- gossipsub_control_total: Received control messages +- gossipsub_message_bytes: Message size in bytes + +To communicate via gossipsub, join the same topics on both the nodes and publish messages +on that topic to get it received on both sides. + +```bash +$ join pubsub-chat +Subscribed to pubsub-chat +Starting receive loop +``` + +Do this on both the terminals. Then publish a message from one side, and see it recieved on the other side. + +```bash +$ publish pubsub-chat hello-from-pubsub! +``` + +See the updated metrics in the dashboards. + +### KAD-DHT + +The following metrics are exposed in this service: + +- kad_inbound_total: Total inbound requests received +- kad_inbound_find_node: Total inbound FIND_NODE requests received +- kad_inbound_get_value: Total inbound GET_VALUE requests received +- kad_inbound_put_value: Total inbound PUT_VALUE requests received +- kad_inbound_get_providers: Total inbound GET_PROVIDERS requests received +- kad_inbound_add_provider: Total inbound ADD_PROVIDER requests received + +To interact between the 2 nodes via kad-dht, we have 2 ways: + +- `PUT_VALUE` in one node, and `GET_VALUE` in another +- `ADD_PROVIDER` in one node, and `GET_PROVIDERS` in another + +#### PUT_VALUE/GET_VALUE + +```bash +$ put /exp/fa kad-dht-value +Stored value: kad-dht-value with key: /exp/fa + +# From another terminal +$ get /exp/fa +Retrieved value: kad-dht-value +``` + +#### ADD_PROVIDER/GET_PROVIDERS + +```bash +$ advertize content-id +Advertised as provider for content: content-id + +# From another terminal +$ get_provider content-id +Found 1 providers: [] +``` + +### SWARM-CONNECTION-EVENTS + +Other than the above 3 services, the incoming/outgoing connection cycle is also monitored via the +following metrics: + +- swarm_incoming_conn: Incoming connection received by libp2p-swarm +- swarm_incoming_conn_error: Incoming connection failure in libp2p-swarm +- swarm_dial_attempt: Dial attempts made by libp2p-swarm +- swarm_dial_attempt_error: Outgoing connection failure in libp2p-swarm diff --git a/examples/metrics/__init__.py b/examples/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/metrics/coordinator.py b/examples/metrics/coordinator.py new file mode 100644 index 000000000..48ac5c4e4 --- /dev/null +++ b/examples/metrics/coordinator.py @@ -0,0 +1,193 @@ +import multiaddr +import trio + +from libp2p import new_host +from libp2p.custom_types import TProtocol +from libp2p.host.ping import ( + ID as PING_ID, + PingService, + handle_ping, +) +from libp2p.kad_dht.kad_dht import DHTMode, KadDHT +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.records.validator import Validator + +GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0") +COMMANDS = """ +Available commands: +- connect - Connect to another peer +- ping - Ping to another peer + +GOSSIPSUB +- join - Subscribe to a topic +- leave - Unsubscribe to a topic +- publish - Publish a message + +KAD-DHT +- put - Execute PUT_VALUE in DHT +- get - Execute GET_VALUE in DHT +- advertize - Execute ADD_PROVIDER in DHT +- get_provider - Execute GET_PROVIDERS in DHT + +- local - List local multiaddr +- help - List the existing commands +- exit - Shut down +""" + + +class ExampleValidator(Validator): + def validate(self, key: str, value: bytes) -> None: + if not value: + raise ValueError("Value cannot be empty") + + def select(self, key: str, values: list[bytes]) -> int: + return 0 + + +class Node: + def __init__(self, listen_addrs: list[multiaddr.Multiaddr], dht_role: str): + # Create a libp2p-host + self.host = new_host(listen_addrs=listen_addrs, enable_metrics=True) + + # PING + self.host.set_stream_handler(PING_ID, handle_ping) + self.ping_service = PingService(self.host) + + # Pubsub/Gossipsub + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_PROTOCOL_ID], + degree=3, # Number of peers to maintain in mesh + degree_low=2, # Lower bound for mesh peers + degree_high=4, # Upper bound for mesh peers + direct_peers=None, # Direct peers + time_to_live=60, # TTL for message cache in seconds + gossip_window=2, # Smaller window for faster gossip + gossip_history=5, # Keep more history + heartbeat_initial_delay=2.0, # Start heartbeats sooner + heartbeat_interval=5, # More frequent heartbeats for testing + ) + self.pubsub = Pubsub(self.host, self.gossipsub) + + # KAD-DHT + if dht_role == "server": + dht_mode = DHTMode.SERVER + else: + dht_mode = DHTMode.CLIENT + self.dht = KadDHT(self.host, dht_mode) + self.dht.register_validator("exp", ExampleValidator()) + + # CLI input send/receive channels + self.input_send_channel, self.input_receive_channel = trio.open_memory_channel( + 100 + ) + + self.termination_event = trio.Event() + + async def receive_loop(self, subsription): + print("Starting receive loop") + while not self.termination_event.is_set(): + try: + message = await subsription.get() + + from_peer_id = ID(message.from_id).to_base58() + if from_peer_id == self.host.get_id().pretty(): + continue + + print(f"From: {ID(message.from_id).to_base58()}") + print(f"Received: {message.data.decode('utf-8')}") + except Exception: + print("Error in receive loop") + await trio.sleep(1) + + async def command_executor(self, nursery): + print("Starting command executor loop...") + + async with self.input_receive_channel: + async for parts in self.input_receive_channel: + try: + if not parts: + continue + cmd = parts[0].lower() + + if cmd == "connect" and len(parts) > 1: + maddr = multiaddr.Multiaddr(parts[1]) + info = info_from_p2p_addr(maddr) + + await self.host.connect(info) + print(f"Connected to {info.peer_id}") + + if cmd == "ping" and len(parts) > 1: + maddr = multiaddr.Multiaddr(parts[1]) + info = info_from_p2p_addr(maddr) + + await self.host.connect(info) + await self.ping_service.ping(info.peer_id, int(parts[2])) + + if cmd == "join" and len(parts) > 1: + subscription = await self.pubsub.subscribe(parts[1]) + nursery.start_soon(self.receive_loop, subscription) + print(f"Subscribed to {parts[1]}") + + if cmd == "leave" and len(parts) > 1: + await self.pubsub.unsubscribe(parts[1]) + print(f"Unsubscribed to {parts[1]}") + + if cmd == "publish" and len(parts) > 2: + await self.pubsub.publish(parts[1], parts[2].encode()) + print(f"Published: {parts[2]}") + + if cmd == "put" and len(parts) > 2: + key = parts[1] + value = parts[2].encode() + + await self.dht.put_value(key, value) + print(f"Stored value: {value.decode()} with key: {key}") + + if cmd == "get" and len(parts) > 1: + key = parts[1] + + retrieved_value = await self.dht.get_value(key) + if retrieved_value: + print(f"Retrieved value: {retrieved_value.decode()}") + else: + print("Failed to retrieve") + + if cmd == "advertize" and len(parts) > 1: + content_id = parts[1] + + success = await self.dht.provide(content_id) + if success: + print(f"Advertised as provider for content: {content_id}") + else: + print("Failed to advertise as provider") + + if cmd == "get_provider" and len(parts) > 1: + content_id = parts[1] + + providers = await self.dht.find_providers(content_id) + if providers: + print( + f"Found {len(providers)} providers: " + f"{[p.peer_id for p in providers]}" + ) + else: + print("No providers found") + + if cmd == "local": + maddr = self.host.get_addrs()[0] + print(maddr) + + if cmd == "help": + print(COMMANDS) + + if cmd == "exit": + print("Exiting...") + self.termination_event.set() + nursery.cancel_scope.cancel() # Stops all tasks + raise KeyboardInterrupt + + except Exception as e: + print(f"Error executing command {parts}: {e}") diff --git a/examples/metrics/docker-compose.yml b/examples/metrics/docker-compose.yml new file mode 100644 index 000000000..0df0d8970 --- /dev/null +++ b/examples/metrics/docker-compose.yml @@ -0,0 +1,18 @@ +services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ../../libp2p/metrics/prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "${PROMETHEUS_PORT}:9090" + extra_hosts: + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "${GRAFANA_PORT}:3000" + depends_on: + - prometheus diff --git a/examples/metrics/runner.py b/examples/metrics/runner.py new file mode 100644 index 000000000..9a782565f --- /dev/null +++ b/examples/metrics/runner.py @@ -0,0 +1,72 @@ +from prompt_toolkit import PromptSession +import trio + +from examples.metrics.coordinator import COMMANDS, Node +from libp2p.metrics.metrics import Metrics +from libp2p.tools.anyio_service.context import background_trio_service +from libp2p.utils.address_validation import get_available_interfaces + + +async def main() -> None: + promt_session = PromptSession() + + # Create a libp2p-node instance + listen_addrs = get_available_interfaces(0) + node = Node( + listen_addrs=listen_addrs, + dht_role="server", + ) + + async with ( + node.host.run(listen_addrs=listen_addrs), + trio.open_nursery() as nursery, + ): + nursery.start_soon(node.host.get_peerstore().start_cleanup_task, 60) + print(f"Host multiaddr: {node.host.get_addrs()[0]}") + + async with background_trio_service(node.pubsub): + async with background_trio_service(node.gossipsub): + async with background_trio_service(node.dht): + await trio.sleep(1) + await node.pubsub.wait_until_ready() + print("Gossipsub and Pubsub services started !!") + print(f"DHT service started with {node.dht.mode} mode") + + # METRICS + metrics = Metrics() + metrics_recv_channel = node.host.get_metrics_recv_channel() + + nursery.start_soon( + metrics.start_prometheus_server, metrics_recv_channel + ) + nursery.start_soon(node.command_executor, nursery) + await trio.sleep(1) + + print("Entering intractive mode, type commands below.") + print(COMMANDS) + + while not node.termination_event.is_set(): + try: + _ = await trio.to_thread.run_sync(input) + user_input = await trio.to_thread.run_sync( + lambda: promt_session.prompt("Command> ") + ) + cmds = user_input.strip().split(" ", 2) + await node.input_send_channel.send(cmds) + + except Exception as e: + print(f"Error in the interactive shell: {e}") + await trio.sleep(1) + + print("Shutdown complete, Goodbye!") + + +def cli() -> None: + try: + trio.run(main) + except KeyboardInterrupt: + print("Session terminated by user") + + +if __name__ == "__main__": + cli() diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 90e57d6f2..ee7beb754 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -5,6 +5,8 @@ import logging from pathlib import Path import ssl + +import trio from libp2p.transport.quic.utils import is_quic_multiaddr from typing import Any from cryptography.hazmat.primitives.asymmetric import ed25519 @@ -293,7 +295,8 @@ def new_swarm( tls_client_config: ssl.SSLContext | None = None, tls_server_config: ssl.SSLContext | None = None, resource_manager: ResourceManager | None = None, - psk: str | None = None + psk: str | None = None, + metric_send_channel: trio.MemorySendChannel[Any] | None = None ) -> INetworkService: logger.debug(f"new_swarm: enable_quic={enable_quic}, listen_addrs={listen_addrs}") """ @@ -438,7 +441,8 @@ def new_swarm( transport, retry_config=retry_config, connection_config=connection_config, - psk=psk + psk=psk, + metric_send_channel=metric_send_channel ) # Set resource manager if provided @@ -468,6 +472,7 @@ def new_host( enable_mDNS: bool = False, enable_upnp: bool = False, enable_autotls: bool = False, + enable_metrics: bool = False, bootstrap: list[str] | None = None, negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, enable_quic: bool = False, @@ -513,6 +518,11 @@ def new_host( if not enable_quic and quic_transport_opt is not None: logger.warning(f"QUIC config provided but QUIC not enabled, ignoring QUIC config") + # Metric emit/consume endpoints + metric_send_channel, metric_recv_channel = None, None + if enable_metrics: + metric_send_channel, metric_recv_channel = trio.open_memory_channel(100) + # Enable automatic protection by default: if no resource manager is supplied, # create a default instance so connections/streams are guarded out of the box. if resource_manager is None: @@ -545,7 +555,8 @@ def new_host( tls_client_config=tls_client_config, tls_server_config=tls_server_config, resource_manager=resource_manager, - psk=psk + psk=psk, + metric_send_channel=metric_send_channel ) if disc_opt is not None: @@ -568,6 +579,7 @@ def new_host( enable_upnp=enable_upnp, negotiate_timeout=negotiate_timeout, resource_manager=resource_manager, + metric_recv_channel=metric_recv_channel, bootstrap_allow_ipv6=bootstrap_allow_ipv6, bootstrap_dns_timeout=bootstrap_dns_timeout, bootstrap_dns_max_retries=bootstrap_dns_max_retries, diff --git a/libp2p/abc.py b/libp2p/abc.py index 446918620..507eec189 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -336,6 +336,7 @@ class INetStream(ReadWriteCloser): """ muxed_conn: IMuxedConn + metric_send_channel: trio.MemorySendChannel[Any] | None @abstractmethod def get_protocol(self) -> TProtocol | None: @@ -2099,6 +2100,12 @@ def remove_stream_handler(self, protocol_id: TProtocol) -> None: """ + @abstractmethod + def get_metrics_recv_channel(self) -> trio.MemoryReceiveChannel[Any] | None: + """ + Returns the recving end of the channel, used for metric events + """ + @abstractmethod async def initiate_autotls_procedure(self, public_ip: str | None = None) -> None: """ diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 2b5adca27..94397a6e3 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -181,12 +181,11 @@ def __init__( network: INetworkService, enable_mDNS: bool = False, enable_upnp: bool = False, - enable_autotls: bool = False, bootstrap: list[str] | None = None, default_protocols: OrderedDict[TProtocol, StreamHandlerFn] | None = None, negotiate_timeout: int = DEFAULT_NEGOTIATE_TIMEOUT, resource_manager: ResourceManager | None = None, - psk: str | None = None, + metric_recv_channel: trio.MemoryReceiveChannel[Any] | None = None, *, bootstrap_allow_ipv6: bool = False, bootstrap_dns_timeout: float = 10.0, @@ -253,7 +252,6 @@ def __init__( dns_resolution_timeout=bootstrap_dns_timeout, dns_max_retries=bootstrap_dns_max_retries, ) - self.psk = psk # Address announcement configuration self._announce_addrs = ( @@ -281,6 +279,9 @@ def __init__( self._identified_peers: set[ID] = set() self._network.register_notifee(_IdentifyNotifee(self)) + # Metrics + self.metric_recv_channel = metric_recv_channel + def get_id(self) -> ID: """ :return: peer_id of host @@ -515,6 +516,12 @@ def _preferred_protocol( ) return None + def get_metrics_recv_channel(self) -> trio.MemoryReceiveChannel[Any] | None: + """ + Returns the recving end of the channel, used for metric events + """ + return self.metric_recv_channel + async def initiate_autotls_procedure(self, public_ip: str | None = None) -> None: """ Run the AutoTLS certificate provisioning flow for this host. diff --git a/libp2p/host/ping.py b/libp2p/host/ping.py index 2c95d5473..e9a506348 100644 --- a/libp2p/host/ping.py +++ b/libp2p/host/ping.py @@ -25,6 +25,19 @@ logger = logging.getLogger(__name__) +class PingEvent: + peer_id: PeerID + rtts: list[int] | None + failure_error: Exception | None + + def __init__( + self, peer_id: PeerID, rtts: list[int] | None, failure_error: Exception | None + ): + self.peer_id = peer_id + self.rtts = rtts + self.failure_error = failure_error + + async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool: """ Return a boolean indicating if we expect more pings from the peer at ``peer_id``. @@ -81,13 +94,18 @@ async def _ping(stream: INetStream) -> int: returns integer value rtt - which denotes round trip time for a ping request in ms """ ping_bytes = secrets.token_bytes(PING_LENGTH) - before = time.time() + + start = time.time() await stream.write(ping_bytes) pong_bytes = await stream.read(PING_LENGTH) - rtt = int((time.time() - before) * (10**6)) + end = time.time() + + rtt = int((end - start) * (10**6)) # in microseconds + if ping_bytes != pong_bytes: logger.debug("invalid pong response") raise + return rtt @@ -100,10 +118,24 @@ def __init__(self, host: IHost): async def ping(self, peer_id: PeerID, ping_amt: int = 1) -> list[int]: stream = await self._host.new_stream(peer_id, [ID]) + rtts: list[int] + event: PingEvent + try: rtts = [await _ping(stream) for _ in range(ping_amt)] - await stream.close() - return rtts - except Exception: - await stream.close() + event = PingEvent( + peer_id=peer_id, + rtts=rtts, + failure_error=None, + ) + + except Exception as error: + event = PingEvent(peer_id=peer_id, rtts=None, failure_error=error) raise + + finally: + await stream.close() + if stream.metric_send_channel is not None: + await stream.metric_send_channel.send(event) + + return rtts diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index 46116b31c..94fddcb71 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -39,7 +39,6 @@ def __init__( router: IPeerRouting, enable_mDNS: bool = False, enable_upnp: bool = False, - enable_autotls: bool = False, bootstrap: list[str] | None = None, resource_manager: ResourceManager | None = None, *, @@ -65,11 +64,10 @@ def __init__( :param announce_addrs: If set, replace listen addrs in get_addrs() """ super().__init__( - network, - enable_mDNS, - enable_upnp, - enable_autotls, - bootstrap, + network=network, + enable_mDNS=enable_mDNS, + enable_upnp=enable_upnp, + bootstrap=bootstrap, resource_manager=resource_manager, bootstrap_allow_ipv6=bootstrap_allow_ipv6, bootstrap_dns_timeout=bootstrap_dns_timeout, diff --git a/libp2p/kad_dht/kad_dht.py b/libp2p/kad_dht/kad_dht.py index 01aa23afc..dc059b1ff 100644 --- a/libp2p/kad_dht/kad_dht.py +++ b/libp2p/kad_dht/kad_dht.py @@ -104,6 +104,17 @@ def is_valid_timestamp(ts: float) -> bool: return True +class KadDhtEvent: + peer_id: str + + inbound: bool = False + find_node: bool = False + get_value: bool = False + put_value: bool = False + get_providers: bool = False + add_provider: bool = False + + class KadDHT(Service): """ Kademlia DHT implementation for libp2p. @@ -473,6 +484,10 @@ async def handle_stream(self, stream: INetStream) -> None: f"Received DHT message from {peer_id}, type: {message.type}" ) + event = KadDhtEvent() + event.peer_id = peer_id.pretty() + event.inbound = True + # Handle FIND_NODE message if message.type == Message.MessageType.FIND_NODE: # Get target key directly from protobuf @@ -492,6 +507,9 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return + # Metrics Event + event.find_node = True + # Build response message with protobuf response = Message() response.type = Message.MessageType.FIND_NODE @@ -553,6 +571,9 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return + # Metrics Event + event.add_provider = True + # Extract provider information for provider_proto in message.providerPeers: try: @@ -621,6 +642,9 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return + # Metrics event + event.get_providers = True + # Find providers for the key providers = self.provider_store.get_providers(key) logger.debug( @@ -715,6 +739,9 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return + # Metrics Event + event.get_value = True + value_record = self.value_store.get(key) if value_record: logger.debug(f"Found value for key {key.hex()}") @@ -807,6 +834,8 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return + event.put_value = True + try: if not (key and value): raise ValueError( @@ -848,6 +877,10 @@ async def handle_stream(self, stream: INetStream) -> None: except Exception as proto_err: logger.warning(f"Failed to parse protobuf message: {proto_err}") + # Send KAD-DHT event to Metrics + if stream.metric_send_channel is not None: + await stream.metric_send_channel.send(event) + await stream.close() except Exception as e: logger.error(f"Error handling DHT stream: {e}") diff --git a/libp2p/metrics/_init__.py b/libp2p/metrics/_init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libp2p/metrics/gossipsub.py b/libp2p/metrics/gossipsub.py new file mode 100644 index 000000000..b2dd4226e --- /dev/null +++ b/libp2p/metrics/gossipsub.py @@ -0,0 +1,58 @@ +from prometheus_client import Counter, Histogram + +from libp2p.pubsub.pubsub import GossipsubEvent + + +class GossipsubMetrics: + publish: Counter + subopts: Counter + control: Counter + + received: Counter + msg_size: Histogram + + def __init__(self) -> None: + self.received = Counter( + "gossipsub_received_total", + "Messages successfully received", + labelnames=["peer_id"], + ) + + self.publish = Counter( + "gossipsub_publish_total", + "Messages to be published", + labelnames=["peer_id"], + ) + + self.subopts = Counter( + "gossipsub_subopts_total", + "Messages notifying peer subscriptions", + labelnames=["peer_id"], + ) + + self.control = Counter( + "gossipsub_control_total", + "Received control messages", + labelnames=["peer_id"], + ) + + self.msg_size = Histogram( + "gossipsub_message_bytes", + "Message size in bytes", + buckets=[64, 128, 256, 512, 1024, 2048, 4096], + ) + + def record(self, event: GossipsubEvent) -> None: + self.received.labels(peer_id=event.peer_id).inc() + + if event.publish: + self.publish.labels(peer_id=event.peer_id).inc() + + if event.subopts: + self.subopts.labels(peer_id=event.peer_id).inc() + + if event.control: + self.control.labels(peer_id=event.peer_id).inc() + + if event.message_size is not None: + self.msg_size.observe(event.message_size) diff --git a/libp2p/metrics/kad_dht.py b/libp2p/metrics/kad_dht.py new file mode 100644 index 000000000..7f8d1e3e2 --- /dev/null +++ b/libp2p/metrics/kad_dht.py @@ -0,0 +1,68 @@ +from prometheus_client import Counter + +from libp2p.kad_dht.kad_dht import KadDhtEvent + + +class KadDhtMetrics: + inbound: Counter + find_node: Counter + get_value: Counter + put_value: Counter + get_providers: Counter + add_provider: Counter + + def __init__(self) -> None: + self.inbound = Counter( + "kad_inbound_total", + "Total inbound requests received", + labelnames=["peer_id"], + ) + + self.find_node = Counter( + "kad_inbound_find_node", + "Total inbound FIND_NODE requests received", + labelnames=["peer_id"], + ) + + self.get_value = Counter( + "kad_inbound_get_value", + "Total inbound GET_VALUE requests received", + labelnames=["peer_id"], + ) + + self.put_value = Counter( + "kad_inbound_put_value", + "Total inbound PUT_VALUE requests received", + labelnames=["peer_id"], + ) + + self.get_providers = Counter( + "kad_inbound_get_providers", + "Total inbound GET_PROVIDERS requests received", + labelnames=["peer_id"], + ) + + self.add_provider = Counter( + "kad_inbound_add_provider", + "Total inbound ADD_PROVIDER requests received", + labelnames=["peer_id"], + ) + + def record(self, event: KadDhtEvent) -> None: + if event.inbound: + self.inbound.labels(peer_id=event.peer_id).inc() + + if event.find_node: + self.find_node.labels(peer_id=event.peer_id).inc() + + if event.get_value: + self.get_value.labels(peer_id=event.peer_id).inc() + + if event.put_value: + self.put_value.labels(peer_id=event.peer_id).inc() + + if event.get_providers: + self.get_providers.labels(peer_id=event.peer_id).inc() + + if event.add_provider: + self.add_provider.labels(peer_id=event.peer_id).inc() diff --git a/libp2p/metrics/metrics.py b/libp2p/metrics/metrics.py new file mode 100644 index 000000000..ccd4d27e1 --- /dev/null +++ b/libp2p/metrics/metrics.py @@ -0,0 +1,73 @@ +import socket +from typing import Any + +from prometheus_client import start_http_server +import trio + +from libp2p.host.ping import PingEvent +from libp2p.kad_dht.kad_dht import KadDhtEvent +from libp2p.metrics.gossipsub import GossipsubMetrics +from libp2p.metrics.kad_dht import KadDhtMetrics +from libp2p.metrics.ping import PingMetrics +from libp2p.metrics.swarm import SwarmEvent, SwarmMetrics +from libp2p.pubsub.pubsub import GossipsubEvent + + +def find_available_port(start_port: int = 8000, host: str = "127.0.0.1") -> int: + port = start_port + + while True: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + sock.bind((host, port)) + return port + except OSError: + port += 1 + + raise RuntimeError("Unreachable") + + +class Metrics: + ping: PingMetrics + gossipsub: GossipsubMetrics + kad_dht: KadDhtMetrics + swarm: SwarmMetrics + + def __init__(self) -> None: + self.ping = PingMetrics() + self.gossipsub = GossipsubMetrics() + self.kad_dht = KadDhtMetrics() + self.swarm = SwarmMetrics() + + async def start_prometheus_server( + self, + metric_recv_channel: trio.MemoryReceiveChannel[Any], + ) -> None: + metrics = find_available_port(8000) + prometheus = find_available_port(9000) + grafana = find_available_port(7000) + + start_http_server(metrics) + + print(f"\nPrometheus metrics visible at: http://localhost:{metrics}") + + print( + "\nTo start prometheus and grafana dashboards, from another terminal: \n" + f"PROMETHEUS_PORT={prometheus} GRAFANA_PORT={grafana} docker compose up\n" + "\nAfter this:\n" + f"Prometheus dashboard will be visible at: http://localhost:{prometheus}\n" + f"Grafana dashboard will be visible at: http://localhost:{grafana}\n" + ) + + while True: + event = await metric_recv_channel.receive() + + match event: + case PingEvent(): + self.ping.record(event) + case GossipsubEvent(): + self.gossipsub.record(event) + case KadDhtEvent(): + self.kad_dht.record(event) + case SwarmEvent(): + self.swarm.record(event) diff --git a/libp2p/metrics/ping.py b/libp2p/metrics/ping.py new file mode 100644 index 000000000..81dbfb0aa --- /dev/null +++ b/libp2p/metrics/ping.py @@ -0,0 +1,37 @@ +from prometheus_client import Counter, Histogram + +from libp2p.host.ping import PingEvent + + +class PingMetrics: + rtt: Histogram + failures: Counter + + def __init__(self) -> None: + rtt = Histogram( + "ping", + "round-trip time sending a 'ping' and receiving a 'pong'", + buckets=[400, 500, 600, 700, 800], + ) + + failures = Counter( + "ping_failure", + "Failure while sending a ping or receiving a ping", + labelnames=["reason", "peer_id"], + ) + + self.rtt = rtt + self.failures = failures + + def record(self, event: PingEvent) -> None: + match event: + case PingEvent(peer_id=_, rtts=list() as rtts, failure_error=None): + print(rtts) + for rtt_us in rtts: + self.rtt.observe(rtt_us) + + case PingEvent(peer_id=_, rtts=None, failure_error=err): + self.failures.labels(reason=type(err).__name__).inc() + + case _: + raise ValueError("Invalid PingEvent state") diff --git a/libp2p/metrics/prometheus.yml b/libp2p/metrics/prometheus.yml new file mode 100644 index 000000000..524c74dfc --- /dev/null +++ b/libp2p/metrics/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: "libp2p-python" + static_configs: + - targets: + - "host.docker.internal:8000" diff --git a/libp2p/metrics/swarm.py b/libp2p/metrics/swarm.py new file mode 100644 index 000000000..88e1b1573 --- /dev/null +++ b/libp2p/metrics/swarm.py @@ -0,0 +1,60 @@ +from prometheus_client import Counter + + +class SwarmEvent: + peer_id: str | None = None + + conn_incoming: bool = False + conn_incoming_error: bool = False + dial_attempt: bool = False + dial_attempt_error: bool = False + + +class SwarmMetrics: + """ + Prometheus metrics for libp2p swarm events. + Mirrors the Rust libp2p metrics implementation. + """ + + conn_incoming: Counter + conn_incoming_error: Counter + dial_attempt: Counter + dial_attempt_error: Counter + + def __init__(self) -> None: + self.conn_incoming = Counter( + "swarm_incoming_conn", + "Incoming connection received by libp2p-swarm", + labelnames=["peer_id"], + ) + + self.conn_incoming_error = Counter( + "swarm_incoming_conn_error", + "Incoming connection failure in libp2p-swarm", + labelnames=["peer_id"], + ) + + self.dial_attempt = Counter( + "swarm_dial_attempt", + "Dial attempts made by libp2p-swarm", + labelnames=["peer_id"], + ) + + self.dial_attempt_error = Counter( + "swarm_dial_attempt_error", + "Outgoing connection failure in libp2p-swarm", + labelnames=["peer_id"], + ) + + def record(self, event: SwarmEvent) -> None: + if event.conn_incoming: + self.conn_incoming.labels(peer_id=event.peer_id).inc() + + if event.conn_incoming_error: + self.conn_incoming_error.labels(peer_id=event.peer_id).inc() + + if event.dial_attempt: + self.dial_attempt.labels(peer_id=event.peer_id).inc() + + if event.dial_attempt_error: + self.dial_attempt_error.labels(peer_id=event.peer_id).inc() diff --git a/libp2p/network/connection/swarm_connection.py b/libp2p/network/connection/swarm_connection.py index 1bc1b154b..d23b0a7e9 100644 --- a/libp2p/network/connection/swarm_connection.py +++ b/libp2p/network/connection/swarm_connection.py @@ -42,6 +42,7 @@ class SwarmConn(INetConn): _direction: Direction _actual_transport_addresses: list[Multiaddr] | None _connection_type: ConnectionType + _metric_send_channel: trio.MemorySendChannel[Any] | None = None def __init__( self, @@ -268,8 +269,7 @@ async def _handle_muxed_stream(self, muxed_stream: IMuxedStream) -> None: await self.swarm.notify_closed_stream(net_stream) async def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: - # - net_stream = NetStream(muxed_stream, self) + net_stream = NetStream(muxed_stream, self, self._metric_send_channel) # Set Stream state to OPEN if the event has already started. # This is to ensure that the new streams created after connection has started # are immediately set to OPEN state. diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 3366dad54..433ac831c 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -5,6 +5,7 @@ import logging from typing import ( TYPE_CHECKING, + Any, ) import trio @@ -122,9 +123,13 @@ class NetStream(INetStream): muxed_stream: IMuxedStream protocol_id: TProtocol | None + metric_send_channel: trio.MemorySendChannel[Any] | None = None def __init__( - self, muxed_stream: IMuxedStream, swarm_conn: "SwarmConn | None" + self, + muxed_stream: IMuxedStream, + swarm_conn: "SwarmConn | None", + metric_send_channel: trio.MemorySendChannel[Any] | None, ) -> None: self.muxed_stream = muxed_stream self.muxed_conn = muxed_stream.muxed_conn @@ -141,6 +146,9 @@ def __init__( # Thread safety for state operations (following AkMo3's approach) self._state_lock = trio.Lock() + # Metrics emit endpoint + self.metric_send_channel = metric_send_channel + def get_protocol(self) -> TProtocol | None: """ :return: protocol id that stream runs on diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 4544497b7..45acee413 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -7,6 +7,7 @@ import random from typing import TYPE_CHECKING, Any, cast +from libp2p.metrics.swarm import SwarmEvent from libp2p.rcmgr import Direction if TYPE_CHECKING: @@ -125,6 +126,7 @@ def __init__( retry_config: RetryConfig | None = None, connection_config: ConnectionConfig | QUICTransportConfig | None = None, psk: str | None = None, + metric_send_channel: trio.MemorySendChannel[Any] | None = None, ): self.self_id = peer_id self.peerstore = peerstore @@ -153,6 +155,9 @@ def __init__( self._resource_manager = None self._stream_semaphore: trio.Semaphore | None = None + # Metrics + self.metric_send_channel = metric_send_channel + # Initialize connection management components self._init_connection_management() @@ -486,6 +491,14 @@ async def dial_peer(self, peer_id: ID) -> list[INetConn]: :raises SwarmException: raised when an error occurs :return: list of muxed connections """ + # Emit metric-event for dial-attempt + event = SwarmEvent() + event.peer_id = peer_id.pretty() + event.dial_attempt = True + + if self.metric_send_channel is not None: + await self.metric_send_channel.send(event) + # Check if we already have connections existing_connections = self.get_connections(peer_id) if existing_connections: @@ -542,6 +555,15 @@ async def dial_peer(self, peer_id: ID) -> list[INetConn]: if not connections: # Tried all addresses, raising exception. + + # Emit metric-event for dial_attempt failure + event = SwarmEvent() + event.peer_id = peer_id.pretty() + event.dial_attempt_error = True + + if self.metric_send_channel is not None: + await self.metric_send_channel.send(event) + raise SwarmDialAllFailedError( f"unable to connect to {peer_id}, no addresses established a " "successful connection (with exceptions)", @@ -826,6 +848,8 @@ async def upgrade_outbound_raw_conn( pass swarm_conn = await self.add_conn(muxed_conn, direction="outbound") + # swarm_conn._metric_send_channel = self.metric_send_channel + logger.debug("successfully dialed peer %s", peer_id) return swarm_conn @@ -1142,14 +1166,30 @@ async def conn_handler( remote_maddr = self._build_remote_multiaddr(read_write_closer) logger.debug(f"[conn_handler] Built remote_maddr: {remote_maddr}") + # Emit a metric-event that we received an inbound connection + inbound_notification = SwarmEvent() + inbound_notification.conn_incoming = True + if self.metric_send_channel is not None: + await self.metric_send_channel.send(inbound_notification) + + # Metric event for inbound connection failure + failure_event = SwarmEvent() + if remote_maddr is not None: if not await self.connection_gate.is_allowed(remote_maddr): logger.debug( "Inbound connection from %s denied by connection gate", remote_maddr, ) + # INbound error try: await read_write_closer.close() + + # Emit event for incoming conn failure + failure_event.conn_incoming_error = True + if self.metric_send_channel is not None: + await self.metric_send_channel.send(failure_event) + except Exception: pass return @@ -1166,8 +1206,15 @@ async def conn_handler( # NOTE: This is a intentional barrier to prevent from the # handler exiting and closing the connection. await self.manager.wait_finished() + except Exception: await read_write_closer.close() + + # Emit event for incoming conn failure + failure_event.conn_incoming_error = True + if self.metric_send_channel is not None: + await self.metric_send_channel.send(failure_event) + return # For non-QUIC connections, wrap in try/except to ensure cleanup @@ -1188,6 +1235,12 @@ async def conn_handler( # If raw_conn wasn't created, # close the underlying connection await read_write_closer.close() + + # Emit event for incoming conn failure + failure_event.conn_incoming_error = True + if self.metric_send_channel is not None: + await self.metric_send_channel.send(failure_event) + except Exception: pass # Re-raise to let the listener handle it appropriately @@ -1519,6 +1572,7 @@ async def add_conn( self, direction=direction, ) + swarm_conn._metric_send_channel = self.metric_send_channel # Set actual transport addresses and connection type from the muxed connection. # This captures the real transport info (IP/port, direct vs relayed) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c4fd230e9..d8281245d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -301,6 +301,16 @@ def clear_expired(self) -> None: MAX_CONCURRENT_VALIDATORS = 10 +class GossipsubEvent: + peer_id: str + + publish: bool = False + subopts: bool = False + control: bool = False + + message_size: int | None = None + + class Pubsub(Service, IPubsub): host: IHost @@ -499,8 +509,13 @@ async def continuously_read_stream(self, stream: INetStream) -> None: ) continue + event = GossipsubEvent() + event.peer_id = peer_id.pretty() + event.message_size = len(incoming) + if rpc_incoming.publish: # deal with RPC.publish + event.publish = True for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): continue @@ -517,6 +532,7 @@ async def continuously_read_stream(self, stream: INetStream) -> None: # peers because a given node only needs its peers # to know that it is subscribed to the topic (doesn't # need everyone to know) + event.subopts = True for message in rpc_incoming.subscriptions: logger.debug( "received `subscriptions` message %s from peer %s", @@ -529,6 +545,7 @@ async def continuously_read_stream(self, stream: INetStream) -> None: # This is necessary because `control` is an optional field in pb2. # Ref: https://developers.google.com/protocol-buffers/docs/reference/python-generated#singular-fields-proto2 # noqa: E501 if rpc_incoming.HasField("control"): + event.control = True # Pass rpc to router so router could perform custom logic logger.debug( "received `control` message %s from peer %s", @@ -536,6 +553,10 @@ async def continuously_read_stream(self, stream: INetStream) -> None: peer_id, ) await self.router.handle_rpc(rpc_incoming, peer_id) + + if stream.metric_send_channel is not None: + await stream.metric_send_channel.send(event) + except StreamEOF: logger.debug( f"Stream closed for peer {peer_id}, exiting read loop cleanly." diff --git a/libp2p/utils/paths.py b/libp2p/utils/paths.py index 45b5a9cfe..2ab5c8133 100644 --- a/libp2p/utils/paths.py +++ b/libp2p/utils/paths.py @@ -29,6 +29,8 @@ AUTOTLS_CERT_PATH = Path("libp2p-forge/peer1/autotls-cert.pem") AUTOTLS_KEY_PATH = Path("libp2p-forge/peer1/autotls-key.pem") +METRICS_CONFIG_PATH = Path("libp2p-metrics/.config") + def get_temp_dir() -> Path: """ diff --git a/newsfragments/1199.feature.rst b/newsfragments/1199.feature.rst new file mode 100644 index 000000000..69a6a7757 --- /dev/null +++ b/newsfragments/1199.feature.rst @@ -0,0 +1 @@ +Added the metrics module to monitor internal service activities via Prometheus/Grafana dashboards. diff --git a/pyproject.toml b/pyproject.toml index 75af48a7e..c7a401411 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "py-cid>=0.5.0", "pynacl>=1.3.0", "rpcudp>=3.0.0", + "prometheus-client>=0.24.1", "trio-typing>=0.0.4", "trio-websocket>=0.11.0", "trio>=0.26.0", @@ -77,6 +78,7 @@ circuit-relay-demo = "examples.circuit_relay.relay_example:main" tls-demo = "examples.tls.example_tls_server:main" tls-client-demo = "examples.tls.example_tls_client:main" path-handling = "examples.path_handling.path_handling_demo:main" +metrics-demo = "examples.metrics.runner:cli" oso-health-report = "libp2p.observability.oso.cli:main" [dependency-groups] diff --git a/tests/core/network/test_net_stream_concurrency.py b/tests/core/network/test_net_stream_concurrency.py index dceae8b99..5ec09904f 100644 --- a/tests/core/network/test_net_stream_concurrency.py +++ b/tests/core/network/test_net_stream_concurrency.py @@ -5,11 +5,13 @@ access to stream state and prevent race conditions. """ +from typing import cast from unittest.mock import Mock import pytest import trio +from libp2p.abc import IMuxedStream from libp2p.network.stream.net_stream import NetStream, StreamState @@ -59,7 +61,11 @@ def reset(self): def mock_stream(): """Create a mock NetStream for testing.""" muxed_stream = MockMuxedStream() - stream = NetStream(muxed_stream=muxed_stream, swarm_conn=Mock()) # type: ignore[arg-type] + stream = NetStream( + muxed_stream=cast(IMuxedStream, muxed_stream), + swarm_conn=Mock(), + metric_send_channel=None, + ) # type: ignore[arg-type] return stream, muxed_stream diff --git a/tests/core/network/test_net_stream_error_state.py b/tests/core/network/test_net_stream_error_state.py index 398cdd2ec..8faf8b7a4 100644 --- a/tests/core/network/test_net_stream_error_state.py +++ b/tests/core/network/test_net_stream_error_state.py @@ -70,7 +70,7 @@ async def __aenter__(self) -> "IMuxedStream": def mock_stream(): """Create a mock stream for testing.""" muxed_stream = MockMuxedStream() - return NetStream(muxed_stream, None) + return NetStream(muxed_stream, None, None) @pytest.mark.trio @@ -190,7 +190,7 @@ async def test_is_operational_with_open_state(mock_stream): async def test_error_state_lifecycle(): """Test complete ERROR state lifecycle.""" muxed_stream = MockMuxedStream() - stream = NetStream(muxed_stream, None) + stream = NetStream(muxed_stream, None, None) # Start in INIT state assert await stream.state == StreamState.INIT diff --git a/tests/core/network/test_net_stream_state_transitions.py b/tests/core/network/test_net_stream_state_transitions.py index b43da2962..3aa1170c6 100644 --- a/tests/core/network/test_net_stream_state_transitions.py +++ b/tests/core/network/test_net_stream_state_transitions.py @@ -42,7 +42,7 @@ async def __aenter__(self) -> "IMuxedStream": def mock_stream(): """Create a mock stream for testing.""" muxed_stream = MockMuxedStream() - return NetStream(muxed_stream, None) + return NetStream(muxed_stream, None, None) @pytest.mark.trio diff --git a/tests/core/network/test_stream_semaphore.py b/tests/core/network/test_stream_semaphore.py index 1dde5ee62..8eb657dfe 100644 --- a/tests/core/network/test_stream_semaphore.py +++ b/tests/core/network/test_stream_semaphore.py @@ -44,7 +44,7 @@ def _mock_net_stream(swarm_conn: Mock | None = None) -> NetStream: muxed_stream.close = AsyncMock() muxed_stream.reset = AsyncMock() - ns = NetStream(muxed_stream, swarm_conn) + ns = NetStream(muxed_stream, swarm_conn, None) return ns