Skip to content

fix(sendStream): handle stream abortion#1047

Open
JHolcman-T wants to merge 4 commits intoh3js:v1from
JHolcman-T:fix/v1-dangling-sse-streams
Open

fix(sendStream): handle stream abortion#1047
JHolcman-T wants to merge 4 commits intoh3js:v1from
JHolcman-T:fix/v1-dangling-sse-streams

Conversation

@JHolcman-T
Copy link
Copy Markdown

resolves #1045

Implements the fix proposed in #1045. Enables handling of aborted (endless) streams e.g. proxied SSE streams.

Note:
This is my first contribution, feel free to suggest improvements of any kind 😊

@pi0 pi0 added the v1 label May 5, 2025
@pi0
Copy link
Copy Markdown
Member

pi0 commented May 5, 2025

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

@codecov
Copy link
Copy Markdown

codecov bot commented May 5, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@JHolcman-T
Copy link
Copy Markdown
Author

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

Sure, where would you expect this unittest to be located - I mean in which *.test.ts file?

@JHolcman-T
Copy link
Copy Markdown
Author

Hi @pi0

I came up with this test case. The only thing is I am not sure where to put it. It seems to me that no test file really fits :/ maybe utils.test.ts ?

    it("can abort endless stream request", async () => {
      let connectionClosed = false;
      let streamCancelled = false;

      app.use(
        "/",
        eventHandler(async (event) => {
          event.node.res.setHeader(
            "Content-Type",
            "text/event-stream; charset=utf-8",
          );
          event.node.res.setHeader("Cache-Control", "no-cache");
          event.node.res.setHeader("Connection", "keep-alive");
          event.node.res.setHeader("X-Accel-Buffering", "no");

          // Detect when client disconnects
          event.node.res.on("close", () => {
            connectionClosed = true;
          });

          // clean up the stream
          let intervalId;
          const encoder = new TextEncoder();

          const stream = new ReadableStream({
            start(controller) {
              intervalId = setInterval(() => {
                controller.enqueue(encoder.encode("data: ping...\n\n"));
              }, 10);
            },
            // this is called when the stream has been cancelled trouhgh the AbortController signal in the sendStream function
            // commenting out the part of abort signal in the sendStream function => this will not be called after the request is cancelled
            cancel() {
              streamCancelled = true;
              // clean up the stream
              clearInterval(intervalId);
            },
          });

          return sendStream(event, stream);
        }),
      );

      const res = await fetch(url + "/", {
        method: "GET",
        headers: {
          Accept: "text/event-stream",
        },
      });

      if (!res.body) {
        throw new Error("No response body");
      } else {
        let messagesCounter = 0;
        const abort = new AbortController();
        res.body
          .pipeTo(
            new WritableStream({
              write(chunk) {
                messagesCounter += 1;
                if (messagesCounter > 5) {
                  // abort reading of the stream after few chunks
                  // simulate client disconnect
                  abort.abort();
                }
              },
            }),
            { signal: abort.signal },
          )
          // hanlde the error that is thrown when the stream is aborted
          .catch((err) => {});
      }
      await new Promise((resolve) => {
        setTimeout(() => {
          resolve(true);
        }, 500);
      });
      expect(res.status).toEqual(200);
      expect(connectionClosed !== false).toBe(true);
      expect(streamCancelled !== false).toBe(true);
    });

@pi0 pi0 changed the title fix(sendStream): handle aborted (endless) streams correctly fix(sendStream): handle stream abortion Jun 3, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 14, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8ab92db5-d86c-46bf-8a89-8db9529ab9c7

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants