Skip to content

client is unable to receive a response after sending a message using the control stream #320

@funky-eyes

Description

@funky-eyes

server

    private static Consumer<ChannelPipeline> configServerPipeline(URL url) {
        int heartbeat = UrlUtils.getHeartbeat(url);
        NettyHttp3ProtocolSelectorHandler selectorHandler =
                new NettyHttp3ProtocolSelectorHandler(url, ScopeModelUtil.getFrameworkModel(url.getScopeModel()));
        return pipeline -> {
            pipeline.addLast(new Http3ServerConnectionHandler(new ChannelInitializer<QuicStreamChannel>() {
                @Override
                protected void initChannel(QuicStreamChannel ch) {
                    ch.pipeline().addLast(new HttpWriteQueueHandler()).addLast(new FlushConsolidationHandler(64, true))
                        .addLast(NettyHttp3FrameCodec.INSTANCE).addLast(selectorHandler);
                }
            }, new TripleHttp3PingPongHandler(heartbeat), null, null, true));
            pipeline.addLast(new Http3TripleServerConnectionHandler());
        };

client

    private static Consumer<ChannelPipeline> configClientPipeline(URL url) {
        int heartbeat = UrlUtils.getHeartbeat(url);
        TripleHttp3PingPongHandler tripleHttp3PingPongHandler = new TripleHttp3PingPongHandler(heartbeat);
        return pipeline -> {
            pipeline.addLast(
                new Http3ClientConnectionHandler(tripleHttp3PingPongHandler, null, null, null, true));
            pipeline.addLast(Http3ClientFrameCodec.INSTANCE);
            pipeline.addLast(new IdleStateHandler(heartbeat, 0, 0, TimeUnit.MILLISECONDS));
            pipeline.addLast(tripleHttp3PingPongHandler);
        };

The server level shares the channehandle with the client side

public class TripleHttp3PingPongHandler extends TriplePingPongHandler {

    private static final ErrorTypeAwareLogger log = LoggerFactory.getErrorTypeAwareLogger(TripleHttp3PingPongHandler.class);

    private final AtomicBoolean alive = new AtomicBoolean(true);

    private static final int PING_PONG_TYPE = 0x45;

    private GracefulShutdown gracefulShutdown;

    public TripleHttp3PingPongHandler(long pingAckTimeout) {
        super(10000);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        QuicStreamChannel streamChannel = Http3.getLocalControlStream(ctx.channel());
        Optional.ofNullable(streamChannel).ifPresent(channel -> sendPingFrame(ctx, streamChannel));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Http3UnknownFrame) {
            Http3UnknownFrame http3UnknownFrame = (Http3UnknownFrame)msg;
            if (http3UnknownFrame.type() == PING_PONG_TYPE) {
                sendPingFrame(ctx);
            }
        }
        if (msg instanceof Http3GoAwayFrame) {
            if (!alive.get()) {
                ctx.fireUserEventTriggered(new DefaultHttp2GoAwayFrame(((Http3GoAwayFrame)msg).id()));
            }
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Optional.ofNullable(pingAckTimeoutFuture).ifPresent(future -> future.cancel(true));
        pingAckTimeoutFuture = null;
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        alive.set(false);
    }

    private void sendPingFrame(ChannelHandlerContext ctx) {
            sendPingFrame(ctx, ctx.channel());
    }

    private void sendPingFrame(ChannelHandlerContext ctx, Channel controlStream) {
        if (alive.get()) {
            pingAckTimeoutFuture = ctx.executor().schedule(new HealthCheckChannelTask(ctx, controlStream, alive),
                pingAckTimeout, TimeUnit.MILLISECONDS);
        } else if (gracefulShutdown == null) {
            gracefulShutdown = new GracefulShutdown(ctx, "app_requested", ctx.voidPromise());
            gracefulShutdown.gracefulHttp3Shutdown();
        }
    }

    private static class HealthCheckChannelTask implements Runnable {

        private final ChannelHandlerContext ctx;
        private final AtomicBoolean alive;
        private final Channel controlStream;
        public HealthCheckChannelTask(ChannelHandlerContext ctx,Channel controlStream, AtomicBoolean alive) {
            this.ctx = ctx;
            this.alive = alive;
            this.controlStream = controlStream;
        }

        @Override
        public void run() {
            Optional.ofNullable(controlStream).ifPresent(channel -> {
                DefaultHttp2PingFrame pingFrame = new DefaultHttp2PingFrame(0);
                Http2Flags flags = pingFrame.ack() ? new Http2Flags().ack(true) : new Http2Flags();
                ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH + PING_FRAME_PAYLOAD_LENGTH);
                try {
                    buf.writeMedium(PING_FRAME_PAYLOAD_LENGTH);
                    buf.writeByte(PING);
                    buf.writeByte(flags.value());
                    buf.writeInt(0);
                    buf.writeLong(pingFrame.content());
                    Http3UnknownFrame frame = new DefaultHttp3UnknownFrame(PING_PONG_TYPE, buf);
                    channel.writeAndFlush(frame).addListener(future -> {
                        if (!future.isSuccess()) {
                            alive.compareAndSet(true, false);
                            ctx.close();
                        }
                        log.info("ping-pong");
                    });
                } catch (Exception e) {
                    log.error("Failed to send ping frame", e);
                }
            });
        }
    }

}

When the client's handle#channelActive is triggered, it obtains the controlStreamChannel to send a heartbeat message. After sending the heartbeat, the server's channelRead is able to receive the message and responds with a heartbeat. Based on the logs, the server successfully responds to the heartbeat. However, the client's handler is not triggered, and after debugging the ByteToMessageDecoder and Http3FrameCodec's channelRead methods, as well as the fireChannelRead method in DefaultChannelPipeline, none of the breakpoints are hit. As a result, after the client sends the first heartbeat, it is unable to send the subsequent heartbeat. How can I resolve this issue?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions