Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Add `engine_getBlobsV3` method [#9582](https://github.com/hyperledger/besu/pull/9582)
- Verify plugins on start [#9601](https://github.com/hyperledger/besu/pull/9601)
- Add EIP-7778 to Amsterdam [#9664](https://github.com/hyperledger/besu/pull/9664)
- Add byte-level metrics for P2P message exchange [#9666](https://github.com/hyperledger/besu/pull/9666)

### Bug fixes
- Fix promotion to prioritized layer for gas price fee markets [#9635](https://github.com/hyperledger/besu/pull/9635)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class NetworkRunner implements AutoCloseable {
private final Map<String, SubProtocol> subProtocols;
private final List<ProtocolManager> protocolManagers;
private final LabelledMetric<Counter> inboundMessageCounter;
private final LabelledMetric<Counter> inboundBytesCounter;
private final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;

private NetworkRunner(
Expand All @@ -61,14 +62,22 @@ private NetworkRunner(
this.protocolManagers = protocolManagers;
this.subProtocols = subProtocols;
this.ethPeersShouldConnect = ethPeersShouldConnect;
inboundMessageCounter =
this.inboundMessageCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
"p2p_messages_inbound",
"Count of each P2P message received inbound.",
"protocol",
"name",
"code");
this.inboundBytesCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
"p2p_bytes_inbound",
"Count of bytes received inbound.",
"protocol",
"name",
"code");
}

public P2PNetwork getNetwork() {
Expand Down Expand Up @@ -147,6 +156,12 @@ private void setupHandlers() {
protocol.messageName(cap.getVersion(), code),
Integer.toString(code))
.inc();
inboundBytesCounter
.labels(
cap.toString(),
protocol.messageName(cap.getVersion(), code),
Integer.toString(code))
.inc(message.getData().getSize());
protocolManager.processMessage(cap, message);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class AbstractPeerConnection implements PeerConnection {
private final AtomicBoolean terminatedImmediately = new AtomicBoolean(false);
protected final PeerConnectionEventDispatcher connectionEventDispatcher;
private final LabelledMetric<Counter> outboundMessagesCounter;
private final LabelledMetric<Counter> outboundBytesCounter;
private final long initiatedAt;
private final boolean inboundInitiated;
private boolean statusSent;
Expand All @@ -68,6 +69,7 @@ protected AbstractPeerConnection(
final CapabilityMultiplexer multiplexer,
final PeerConnectionEventDispatcher connectionEventDispatcher,
final LabelledMetric<Counter> outboundMessagesCounter,
final LabelledMetric<Counter> outboundBytesCounter,
final boolean inboundInitiated) {
this.peer = peer;
this.peerInfo = peerInfo;
Expand All @@ -82,6 +84,7 @@ protected AbstractPeerConnection(
}
this.connectionEventDispatcher = connectionEventDispatcher;
this.outboundMessagesCounter = outboundMessagesCounter;
this.outboundBytesCounter = outboundBytesCounter;
this.inboundInitiated = inboundInitiated;
this.initiatedAt = System.currentTimeMillis();

Expand Down Expand Up @@ -118,13 +121,26 @@ private void doSend(final Capability capability, final MessageData message) {
subProtocol.messageName(capability.getVersion(), message.getCode()),
Integer.toString(message.getCode()))
.inc();
outboundBytesCounter
.labels(
capability.toString(),
subProtocol.messageName(capability.getVersion(), message.getCode()),
Integer.toString(message.getCode()))
.inc(message.getSize());

} else {
outboundMessagesCounter
.labels(
"Wire",
WireMessageCodes.messageName(message.getCode()),
Integer.toString(message.getCode()))
.inc();
outboundBytesCounter
.labels(
"Wire",
WireMessageCodes.messageName(message.getCode()),
Integer.toString(message.getCode()))
.inc(message.getSize());
}

LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ final class DeFramer extends ByteToMessageDecoder {
private final PeerLookup peerLookup;
private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
private final LabelledMetric<Counter> outboundBytesCounter;

DeFramer(
final Framer framer,
Expand Down Expand Up @@ -101,6 +102,14 @@ final class DeFramer extends ByteToMessageDecoder {
"protocol",
"name",
"code");
this.outboundBytesCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
"p2p_bytes_outbound",
"Count of bytes sent outbound.",
"protocol",
"name",
"code");
}

@Override
Expand Down Expand Up @@ -165,6 +174,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
capabilityMultiplexer,
connectionEventDispatcher,
outboundMessagesCounter,
outboundBytesCounter,
inboundInitiated);

// Check peer is who we expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public NettyPeerConnection(
final CapabilityMultiplexer multiplexer,
final PeerConnectionEventDispatcher connectionEventDispatcher,
final LabelledMetric<Counter> outboundMessagesCounter,
final LabelledMetric<Counter> outboundBytesCounter,
final boolean inboundInitiated) {
super(
peer,
Expand All @@ -54,6 +55,7 @@ public NettyPeerConnection(
multiplexer,
connectionEventDispatcher,
outboundMessagesCounter,
outboundBytesCounter,
inboundInitiated);

this.ctx = ctx;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.network;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;

import java.util.List;
import java.util.function.BiFunction;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

public class NetworkRunnerTest {

private NetworkRunner networkRunner;
private P2PNetwork network;
private LabelledMetric<Counter> inboundMessageCounter;
private LabelledMetric<Counter> inboundBytesCounter;
private Counter messageCounter;
private Counter bytesCounter;
private SubProtocol subProtocol;
private ProtocolManager protocolManager;

@BeforeEach
@SuppressWarnings("unchecked")
public void setUp() {
network = mock(P2PNetwork.class);
subProtocol = mock(SubProtocol.class);
protocolManager = mock(ProtocolManager.class);

inboundMessageCounter = mock(LabelledMetric.class);
inboundBytesCounter = mock(LabelledMetric.class);
messageCounter = mock(Counter.class);
bytesCounter = mock(Counter.class);

MetricsSystem metricsSystem = mock(MetricsSystem.class);
when(metricsSystem.createLabelledCounter(
any(), eq("p2p_messages_inbound"), any(), any(), any(), any()))
.thenReturn(inboundMessageCounter);
when(metricsSystem.createLabelledCounter(
any(), eq("p2p_bytes_inbound"), any(), any(), any(), any()))
.thenReturn(inboundBytesCounter);

when(inboundMessageCounter.labels(anyString(), anyString(), anyString()))
.thenReturn(messageCounter);
when(inboundBytesCounter.labels(anyString(), anyString(), anyString()))
.thenReturn(bytesCounter);

// Setup subProtocol to return "eth" as its name so it can be looked up
when(subProtocol.getName()).thenReturn("eth");

// Setup network mocks to allow start() to complete
when(network.isListening()).thenReturn(true);

BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect = (peer, incoming) -> true;

NetworkRunner.NetworkBuilder networkBuilder = caps -> network;

networkRunner =
NetworkRunner.builder()
.protocolManagers(List.of(protocolManager))
.subProtocols(subProtocol)
.network(networkBuilder)
.metricsSystem(metricsSystem)
.ethPeersShouldConnect(ethPeersShouldConnect)
.build();
}

@Test
public void shouldIncrementInboundBytesCounterWhenProcessingMessage() {
// Setup
Capability capability = Capability.create("eth", 68);
int messageCode = 1;
int messageSize = 1024;

when(subProtocol.getName()).thenReturn("eth");
when(subProtocol.isValidMessageCode(anyInt(), eq(messageCode))).thenReturn(true);
when(subProtocol.messageName(anyInt(), eq(messageCode))).thenReturn("Status");
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));

// Start network runner to register handlers
networkRunner.start();

// Capture the message handler that was registered
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
verify(network).subscribe(eq(capability), handlerCaptor.capture());

MessageCallback handler = handlerCaptor.getValue();

// Create test message
PeerConnection peerConnection = mock(PeerConnection.class);
MessageData messageData = mock(MessageData.class);
when(messageData.getSize()).thenReturn(messageSize);
when(messageData.getCode()).thenReturn(messageCode);

Message message = new DefaultMessage(peerConnection, messageData);

// Process message through the handler
handler.onMessage(capability, message);

// Verify bytes counter was incremented with correct labels and size
verify(inboundBytesCounter).labels(eq(capability.toString()), eq("Status"), eq("1"));
verify(bytesCounter).inc(messageSize);
}

@Test
public void shouldIncrementInboundBytesCounterForMultipleMessages() {
// Setup
Capability capability = Capability.create("eth", 68);
int messageCode1 = 1;
int messageCode2 = 2;
int messageSize1 = 512;
int messageSize2 = 2048;

when(subProtocol.getName()).thenReturn("eth");
when(subProtocol.isValidMessageCode(anyInt(), anyInt())).thenReturn(true);
when(subProtocol.messageName(anyInt(), eq(messageCode1))).thenReturn("Status");
when(subProtocol.messageName(anyInt(), eq(messageCode2))).thenReturn("GetBlockHeaders");
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));

// Start network runner to register handlers
networkRunner.start();

// Capture the message handler
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
verify(network).subscribe(eq(capability), handlerCaptor.capture());

MessageCallback handler = handlerCaptor.getValue();

// Create first test message
PeerConnection peerConnection = mock(PeerConnection.class);
MessageData messageData1 = mock(MessageData.class);
when(messageData1.getSize()).thenReturn(messageSize1);
when(messageData1.getCode()).thenReturn(messageCode1);
Message message1 = new DefaultMessage(peerConnection, messageData1);

// Create second test message
MessageData messageData2 = mock(MessageData.class);
when(messageData2.getSize()).thenReturn(messageSize2);
when(messageData2.getCode()).thenReturn(messageCode2);
Message message2 = new DefaultMessage(peerConnection, messageData2);

// Process both messages
handler.onMessage(capability, message1);
handler.onMessage(capability, message2);

// Verify bytes counter was incremented for both messages
verify(bytesCounter).inc(messageSize1);
verify(bytesCounter).inc(messageSize2);
verify(bytesCounter, times(2)).inc(anyLong());
}

@Test
public void shouldUseCorrectLabelsForInboundBytesCounter() {
// Setup
Capability capability = Capability.create("eth", 68);
int messageCode = 5;
int messageSize = 256;
String messageName = "NewBlock";

when(subProtocol.getName()).thenReturn("eth");
when(subProtocol.isValidMessageCode(anyInt(), eq(messageCode))).thenReturn(true);
when(subProtocol.messageName(anyInt(), eq(messageCode))).thenReturn(messageName);
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));

// Start network runner to register handlers
networkRunner.start();

// Capture the message handler
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
verify(network).subscribe(eq(capability), handlerCaptor.capture());

MessageCallback handler = handlerCaptor.getValue();

// Create test message
PeerConnection peerConnection = mock(PeerConnection.class);
MessageData messageData = mock(MessageData.class);
when(messageData.getSize()).thenReturn(messageSize);
when(messageData.getCode()).thenReturn(messageCode);
Message message = new DefaultMessage(peerConnection, messageData);

// Process message
handler.onMessage(capability, message);

// Verify correct labels were used (protocol, message name, code)
verify(inboundBytesCounter)
.labels(eq(capability.toString()), eq(messageName), eq(String.valueOf(messageCode)));
verify(bytesCounter).inc(messageSize);
}
}
Loading