Skip to content

Commit

Permalink
flush response for sse
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Feb 21, 2024
1 parent d1fabe2 commit 6b4618a
Showing 1 changed file with 49 additions and 1 deletion.
50 changes: 49 additions & 1 deletion pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"fmt"
"io"
"mime"
"net/http"
"reflect"
"regexp"
Expand Down Expand Up @@ -246,11 +247,58 @@ func (mi *muxInstance) sendResponse(ctx *context.Context, stdw http.ResponseWrit
header[k] = v
}
stdw.WriteHeader(resp.StatusCode())
respBodySize, _ := io.Copy(stdw, resp.GetPayload())

var writer io.Writer
if responseNeedFlush(resp) {
writer = NewResponseFlushWriter(stdw)
} else {
writer = stdw
}
respBodySize, _ := io.Copy(writer, resp.GetPayload())

return resp.StatusCode(), uint64(respBodySize) + uint64(resp.MetaSize()), header
}

// ResponseFlushWriter is a wrapper of http.ResponseWriter, which flushes the
// response immediately if the response needs to be flushed.
type ResponseFlushWriter struct {
w http.ResponseWriter
flush func()
}

// Write writes the data to the connection as part of an HTTP reply.
func (w *ResponseFlushWriter) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
w.flush()
return n, err
}

// NewResponseFlushWriter creates a ResponseFlushWriter.
func NewResponseFlushWriter(w http.ResponseWriter) *ResponseFlushWriter {
if flusher, ok := w.(http.Flusher); ok {
return &ResponseFlushWriter{
w: w,
flush: flusher.Flush,
}
}
return &ResponseFlushWriter{
w: w,
flush: func() {},
}
}

func responseNeedFlush(resp *httpprot.Response) bool {
resCTHeader := resp.Std().Header.Get("Content-Type")
resCT, _, err := mime.ParseMediaType(resCTHeader)

// For Server-Sent Events responses, flush immediately.
// The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
if err == nil && resCT == "text/event-stream" {
return true
}
return false
}

func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
// Replace the body of the original request with a ByteCountReader, so
// that we can calculate the actual request size.
Expand Down

0 comments on commit 6b4618a

Please sign in to comment.