Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream should not be closed automatically #2993

Closed
dalechyn opened this issue Jun 18, 2024 · 7 comments
Closed

Stream should not be closed automatically #2993

dalechyn opened this issue Jun 18, 2024 · 7 comments
Labels

Comments

@dalechyn
Copy link

What version of Hono are you using?

4.4.6

What runtime/platform is your app running on?

Bun

What steps can reproduce the bug?

Using either stream or streamSSE is not gonna work unless you're using an infinite loop that prevents callback from stopping the execution.

See stream.ts:

;(async () => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)
} else {
console.error(e)
}
} finally {
stream.close()
}
})()

And see streamSSE.ts:

const run = async (
stream: SSEStreamingApi,
cb: (stream: SSEStreamingApi) => Promise<void>,
onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>
): Promise<void> => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)
await stream.writeSSE({
event: 'error',
data: e.message,
})
} else {
console.error(e)
}
} finally {
stream.close()
}
}

What they both have in common is finally clause that closes the stream automatically.

I believe that this is counter-intuitive and really opinionated. Stream initiator should decide himself when to close the stream.

For the reason why I believe this is counter-intuitive: I'm using EventEmitter on my backend and having a frontend that consumes SSE over EventSource. It's a pretty trivial approach – backend receives an event and proxies to SSE consumers if there are any.
I discovered that this is however not possible due to the implementation of those helpers.

My very first naive suggestion is to not have streams closed automatically. However, this most likely won't be backwards compatible and would require a major bump.

A solution to this that I see is to have the streaming helpers be capable of handling Async Generators.

Helpers could check if a function has Symbol.iterator and next properties defined (see here), and if one is, treat it differently and naturally close the stream if the async generator is finished.

Here's a small draft of how it could look like:

const run = async ( 
   stream: SSEStreamingApi, 
   cb: (stream: SSEStreamingApi) => Promise<void>, 
   onError?: (e: Error, stream: SSEStreamingApi) => Promise<void> 
 ): Promise<void> => { 
   try {
     if (typeof cb[Symbol.iterator] === 'function') {
       for await (const _value of cb(stream)) {}
     } else await cb(stream)
   } catch (e) { 
     if (e instanceof Error && onError) { 
       await onError(e, stream) 
  
       await stream.writeSSE({ 
         event: 'error', 
         data: e.message, 
       }) 
     } else { 
       console.error(e) 
     } 
   } finally { 
     stream.close() 
   } 
 } 

For now, I'm forced to patch the package to not close the stream.

What is the expected behavior?

No response

What do you see instead?

No response

Additional information

#2050

@dalechyn dalechyn added the bug label Jun 18, 2024
@yusukebe
Copy link
Member

Hi @dalechyn

@sor4chi @watany-dev Can you take a look?

@usualoma
Copy link
Member

Hi @dalechyn!

Does the approach of using an asynchronous generator in stream() not address your issue?

app.get('/stream', (c) => {
  return stream(c, async (stream) => {
    stream.onAbort(() => {
      console.log('Aborted!')
    })

    for await (const data of generator()) {
      await stream.writeln(data)
    }
  })
})

@dalechyn
Copy link
Author

Hi @dalechyn!

Does the approach of using an asynchronous generator in stream() not address your issue?

app.get('/stream', (c) => {
  return stream(c, async (stream) => {
    stream.onAbort(() => {
      console.log('Aborted!')
    })

    for await (const data of generator()) {
      await stream.writeln(data)
    }
  })
})

Hi @usualoma! Thanks for investigating this issue.

I unfortunately have to throw off async-generator idea as an improvement over the current API.
Your suggestion sounds correct though – If I'm able to leverage generators in the local scope, I should be fine as stream would never get closed unless a generator returns the final value.

However, use of async generators when it comes to processing an EventEmitter-like emitted events is quote ambiguous – meaning that the end implementation seems more like a hack.

Consider my example where there is an EventEmitter emitting events on the backend side. Once a FE starts to listen for SSE events, those EventEmitter's events are emitted via SSE too.

With generator approach – I'd need to wrap event listeners to Promises that would have to be yielded and destructed after Promise resolve and reconstructed back on the next iteration of generator. This is a workaround, and sounds as an not effective one. Here is what StackOverflow says about it – https://stackoverflow.com/questions/51045136/how-can-i-use-a-event-emitter-as-an-async-generator.

That said, I still believe that's counterintuitive.

My current workaround without the patch is to simply return a new Promise that never resolves, so that await hangs indefinitely.

@usualoma
Copy link
Member

@dalechyn
Thanks for the reply!
Can you give me an example of "a code snippet of the application you want to write after applying the patch"?

@usualoma
Copy link
Member

If the app uses EventEmitter and data is sent, received and closed on an event basis, I would typically write the following.

app.get('/stream', (c) => {
  return stream(c, async (stream) => {
    emitter.on('data', (data) => {
      stream.writeln(data)
    })
    return new Promise((resolve) => {
      stream.onAbort(resolve)
      emitter.on('close', resolve)
    })
  })
})

Apparently there is a problem with onAbort not being called in bun.

@dalechyn
Copy link
Author

If the app uses EventEmitter and data is sent, received and closed on an event basis, I would typically write the following.

app.get('/stream', (c) => {
  return stream(c, async (stream) => {
    emitter.on('data', (data) => {
      stream.writeln(data)
    })
    return new Promise((resolve) => {
      stream.onAbort(resolve)
      emitter.on('close', resolve)
    })
  })
})

Apparently there is a problem with onAbort not being called in bun.

Yes, this sounds as a viable solution for now!
Also thanks for noticing this issue with onAbort not being called in bun.
Are you aware of the upstream issue for this?

@dalechyn dalechyn closed this as not planned Won't fix, can't repro, duplicate, stale Jun 25, 2024
@usualoma
Copy link
Member

In Bun, there is a behaviour where ReadableStream cancel is not called when the client disconnects, and it is not clear whether this is a bug or intended, but hono will fix this at #3042

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants