-
-
Notifications
You must be signed in to change notification settings - Fork 42
Open
Description
server
private static Consumer<ChannelPipeline> configServerPipeline(URL url) {
int heartbeat = UrlUtils.getHeartbeat(url);
NettyHttp3ProtocolSelectorHandler selectorHandler =
new NettyHttp3ProtocolSelectorHandler(url, ScopeModelUtil.getFrameworkModel(url.getScopeModel()));
return pipeline -> {
pipeline.addLast(new Http3ServerConnectionHandler(new ChannelInitializer<QuicStreamChannel>() {
@Override
protected void initChannel(QuicStreamChannel ch) {
ch.pipeline().addLast(new HttpWriteQueueHandler()).addLast(new FlushConsolidationHandler(64, true))
.addLast(NettyHttp3FrameCodec.INSTANCE).addLast(selectorHandler);
}
}, new TripleHttp3PingPongHandler(heartbeat), null, null, true));
pipeline.addLast(new Http3TripleServerConnectionHandler());
};
client
private static Consumer<ChannelPipeline> configClientPipeline(URL url) {
int heartbeat = UrlUtils.getHeartbeat(url);
TripleHttp3PingPongHandler tripleHttp3PingPongHandler = new TripleHttp3PingPongHandler(heartbeat);
return pipeline -> {
pipeline.addLast(
new Http3ClientConnectionHandler(tripleHttp3PingPongHandler, null, null, null, true));
pipeline.addLast(Http3ClientFrameCodec.INSTANCE);
pipeline.addLast(new IdleStateHandler(heartbeat, 0, 0, TimeUnit.MILLISECONDS));
pipeline.addLast(tripleHttp3PingPongHandler);
};
The server level shares the channehandle with the client side
public class TripleHttp3PingPongHandler extends TriplePingPongHandler {
private static final ErrorTypeAwareLogger log = LoggerFactory.getErrorTypeAwareLogger(TripleHttp3PingPongHandler.class);
private final AtomicBoolean alive = new AtomicBoolean(true);
private static final int PING_PONG_TYPE = 0x45;
private GracefulShutdown gracefulShutdown;
public TripleHttp3PingPongHandler(long pingAckTimeout) {
super(10000);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
QuicStreamChannel streamChannel = Http3.getLocalControlStream(ctx.channel());
Optional.ofNullable(streamChannel).ifPresent(channel -> sendPingFrame(ctx, streamChannel));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http3UnknownFrame) {
Http3UnknownFrame http3UnknownFrame = (Http3UnknownFrame)msg;
if (http3UnknownFrame.type() == PING_PONG_TYPE) {
sendPingFrame(ctx);
}
}
if (msg instanceof Http3GoAwayFrame) {
if (!alive.get()) {
ctx.fireUserEventTriggered(new DefaultHttp2GoAwayFrame(((Http3GoAwayFrame)msg).id()));
}
}
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Optional.ofNullable(pingAckTimeoutFuture).ifPresent(future -> future.cancel(true));
pingAckTimeoutFuture = null;
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
alive.set(false);
}
private void sendPingFrame(ChannelHandlerContext ctx) {
sendPingFrame(ctx, ctx.channel());
}
private void sendPingFrame(ChannelHandlerContext ctx, Channel controlStream) {
if (alive.get()) {
pingAckTimeoutFuture = ctx.executor().schedule(new HealthCheckChannelTask(ctx, controlStream, alive),
pingAckTimeout, TimeUnit.MILLISECONDS);
} else if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown(ctx, "app_requested", ctx.voidPromise());
gracefulShutdown.gracefulHttp3Shutdown();
}
}
private static class HealthCheckChannelTask implements Runnable {
private final ChannelHandlerContext ctx;
private final AtomicBoolean alive;
private final Channel controlStream;
public HealthCheckChannelTask(ChannelHandlerContext ctx,Channel controlStream, AtomicBoolean alive) {
this.ctx = ctx;
this.alive = alive;
this.controlStream = controlStream;
}
@Override
public void run() {
Optional.ofNullable(controlStream).ifPresent(channel -> {
DefaultHttp2PingFrame pingFrame = new DefaultHttp2PingFrame(0);
Http2Flags flags = pingFrame.ack() ? new Http2Flags().ack(true) : new Http2Flags();
ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH + PING_FRAME_PAYLOAD_LENGTH);
try {
buf.writeMedium(PING_FRAME_PAYLOAD_LENGTH);
buf.writeByte(PING);
buf.writeByte(flags.value());
buf.writeInt(0);
buf.writeLong(pingFrame.content());
Http3UnknownFrame frame = new DefaultHttp3UnknownFrame(PING_PONG_TYPE, buf);
channel.writeAndFlush(frame).addListener(future -> {
if (!future.isSuccess()) {
alive.compareAndSet(true, false);
ctx.close();
}
log.info("ping-pong");
});
} catch (Exception e) {
log.error("Failed to send ping frame", e);
}
});
}
}
}
When the client's handle#channelActive is triggered, it obtains the controlStreamChannel to send a heartbeat message. After sending the heartbeat, the server's channelRead is able to receive the message and responds with a heartbeat. Based on the logs, the server successfully responds to the heartbeat. However, the client's handler is not triggered, and after debugging the ByteToMessageDecoder and Http3FrameCodec's channelRead methods, as well as the fireChannelRead method in DefaultChannelPipeline, none of the breakpoints are hit. As a result, after the client sends the first heartbeat, it is unable to send the subsequent heartbeat. How can I resolve this issue?
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels