-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
finagle-http: allow streaming of fixed length messages #740
Conversation
dee3e4a
to
ebfdc52
Compare
Requires custom finagle 18.5.0 twitter/finagle#740 PoC fix for linkerd#1691
ebfdc52
to
94eca45
Compare
This looks good to me. I'm not sure how I feel about the name, but I can't think of something better right now. Also, I wonder if it's reasonable to invert this API, i.e., when
|
@luciferous I agree, that the API is confusing. My main motivation was backward compatibility, and I'm eager to redo this if compatibility is not an issue. If developers always decide between using Regardless of whether we invert or not, I like your idea with optional parameter. |
@@ -146,7 +147,7 @@ package object http { | |||
// specific HttpServerExpectContinueHandler above. | |||
pipeline.addLast( | |||
"fixedLenAggregator", | |||
new FixedLengthMessageAggregator(maxRequestSize, handleExpectContinue = false) | |||
new FixedLengthMessageAggregator(maxRequestSize, dechunkThreshold, handleExpectContinue = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the client side? Here is how it looks today:
// 8 KB is the size of the maxChunkSize parameter used in netty3,
// which is where it stops attempting to aggregate messages that lack
// a 'Transfer-Encoding: chunked' header.
fn("fixedLenAggregator", new FixedLengthMessageAggregator(8.kilobytes))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this more, it seems we shouldn't probably use the maxRequestSize
here at all and just introduce a separate parameter for it instead.
I like @luciferous' suggestion. And I would go with minChunkSize
as a parameter name.
Http.server.withStreaming(true, minChunkSize = 128.kb)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had no intention to implement client side, but I'm open doing this if you think we need that. I do not know, however, what is the behavior right now, and what it should be. Would need to investigate.
minChunkSize
is a bit confusing for me.
In general, it is confusing, that finagle's chunk is different from http chunk. It took me some time to realize, that Request.isChunked
has, strictly speaking, nothing to do with Transfer-Encoding
header.
If you believe, that for experienced finagle users minChunkSize
would clearly mean finagle's chunk, then let's do this name. Otherwise, I, as non-experienced finagle user, would prefer to see the parameter named smthng like streamingThreshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go with minChunkSize
for now, if we think of something better during the review we can always change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the client-side too as, otherwise, Http.client.withStreaming(true, 128.kb)
would mean the minChunkSize
is silently ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkostyukov, buffering indeed may happen, but not in FixedLengthMessageAggregator
.
Aggregator's buffering is all-or-nothing: we either buffer all message into a huge buffer up to maxContentLength
or we bypass the aggregator, and emit chunks of data as they arrive from the upstream handlers in pipeline (and in this case minChunkSize
does not have any effect).
So for example if we have a sufficiently large minChunkSize
(say, 128 KiB) that is used as a maxContentLength
for FixedLengthMessageAggregator
, then the consumer of Request.reader
wil actually never read a buffer of size 128*1024. On contrary, 128 KiB will likely be maximal theoretical value the consumer can read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'm convinced. Sorry for the confusion, I think I mis-understood what FixedLengthMessageAggregator
does. bufferSize
seems reasonable but, still, could be interpreted as "buffer my chunks pls".
Perhaps something as verbose as aggregateIfLessThan = 128.kb
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any better ideas. aggregateIfLessThan
works for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luciferous
small clarification
bufferSize = 128.kilobytes
: buffer up to 128.kilobytes;Reader.read
will ALWAYS returnbuf
wherebuf.length <= 128.kilobytes
if
- bufferSizez is very small. For example
bufferSize == 100.bytes
- request has
Transfer-Encoding: chunked
or requestContent-Length
is> 100
thenRequest.reader.read(128.kilobytes.inBytes)
will likely readbuf
withbuf.length > 100
(the value will be determined by upstream handlers in netty pipeline)
if
- bufferSizez is very small. For example
bufferSize == 100.bytes
- request does NOT have
Transfer-Encoding: chunked
and requestContent-Length
is<= 100
thenRequest.reader.read(128.kilobytes.inBytes)
is guaranteed to readbuf
withbuf.length <= 100
So, as you see, there are no guarantees at all.
This discussion makes me think, that maybe we need couple of tests to demonstrate that behavior, so it is easier to reason about the handler behavior?
In the meantime I will make an attempt to improve the scaladoc on the aggregator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'm not sure I follow, but hopefully some test/docs will clarify. I'll think about it more too.
@@ -12,9 +12,18 @@ import io.netty.handler.codec.http._ | |||
*/ | |||
private[http] class FixedLengthMessageAggregator( | |||
maxContentLength: StorageUnit, | |||
handleExpectContinue: Boolean = true | |||
dechunkThreshold: StorageUnit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe keep using a single StorageUnit
parameter and set it to min
between the two on the call-site?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I'm reworking this code right now and came to the same conclusion.
Initially I wasn't sure, if we can set maxContentLength to 0 and still retain all the features.
Now I looked more carefully in the code, and it seems, that the only case when we really aggregate something in buf is when there's a HttpMessage
with following HttpContent
.
All other cases only make sure, that the request, when it makes sense, has isChunked
set to false
and content is available as string or buf.
I wonder even, if it makes sense to eventually split FixedLengthMessageAggregator
so it has more clear purpose: one class should handle actual aggregation, and the other one should handle all the quirks, like noContentResponse and messages without content-length and trasfer-encoding headers..
I think let's go ahead with the @vkostyukov's suggestion of |
94eca45
to
8946f66
Compare
...tp/src/main/scala/com/twitter/finagle/netty4/http/handler/FixedLengthMessageAggregator.scala
Show resolved
Hide resolved
...tp/src/main/scala/com/twitter/finagle/netty4/http/handler/FixedLengthMessageAggregator.scala
Outdated
Show resolved
Hide resolved
...tp/src/main/scala/com/twitter/finagle/netty4/http/handler/FixedLengthMessageAggregator.scala
Outdated
Show resolved
Hide resolved
I fixed review notes. I need some time to figure out why tests fail if buffer size for the aggregator is 0. I thought, it should work, but apparently I missed something. My guess atm, is that even when we deduced a message length to be zero, netty can still send some content which we attempt to buffer (and since buffer length is 0, aggregator returns error 413). Or maybe some tests just have to be adjusted. |
Codecov Report
@@ Coverage Diff @@
## develop #740 +/- ##
===========================================
+ Coverage 70.43% 71.07% +0.64%
===========================================
Files 745 679 -66
Lines 23724 20710 -3014
Branches 1736 1540 -196
===========================================
- Hits 16709 14719 -1990
+ Misses 7015 5991 -1024
Continue to review full report at Codecov.
|
I fixed the failing test. The issue was not in the code, but in the test itself. It assumed, that the incoming request is not chunked, hence its reader has all the content already, hence it is safe to read it synchronously. Do we want to get back to the parameter name discussion and try to comeup with better name? Or do you think it's good as is? If yes, do you want me to squash all the commits into a single one before you merge? |
a197ecf
to
3f65913
Compare
finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http/package.scala
Show resolved
Hide resolved
*/ | ||
def withStreaming(enabled: Boolean): Server = | ||
configured(http.param.Streaming(enabled)) | ||
def withStreaming(enabled: Boolean, aggregateIfLessThen: StorageUnit = zero): Server = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make it a separate method. Java doesn't understand default arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Also what do you think of using AggregateIfLessThan.aggregateIfLessThan.default
instead of zero
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take this opportunity to offer once again different name for the property. I promise it's the last time I'm raising this :)
withAggregateIfLessThan
con: does not suggest in any way, that it is realted to streaming
con: while it is very verbose and explicit, it is almost too explicit, delusionally explicit.
It explicitly says, what the framework will do. And even mentions some condition. But not all conditions are mentioned. That's why it is delusional.
Few alternatives
withStreamingThreshold(64.kilobytes)
pro: suggests that it is related to streaming
pro: threshold is a very general term, I'd decide to read docs to know how behavior changes once threshold is reached and wouldn't be fooled by an overly explicit namewithStreamingBuffer(64.kilobytes)
pro: suggests that it is realted to streaming
pro: buffer is a very generic term, so I'd probably decide to read docs and wouldn't be fooled by an overly explicit name
con: even more delusional thanwithAggregateIfLessThan
withContentLengthStreamedAfter(64.kilobytes)
pro: suggest, that it is realted to streaming (if you typedStr
in IDE you'd likely see this among suggestions)
pro: suggests that it may be realted toContent-Length
(which is the truth, we only aggregate, if there's content-length header). IMO it is the least delusional name.
con: not less verbose, thanwithAggregateIfLessThan
, but, imo, much more readablewithFixedLengthStreamedAfter(64.kilobytes)
pro: related to streaming
pro: suggests, that it is related to fixed length messages
con: develop must know what is a fixed length message
con: verbosewithStreamingAfter(contentLength: StorageUnit)
pro: related to streaming
pro: related to content length
con: contentLength is only present in method definition and only in original sources (won't work on decompiled code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like all of those names but I'm still in the "we should group this new config with withStreaming
" camp. As the new value only takes affect when streaming is enabled, we might want to keep them together (one configuration entry and perhaps one stack param).
Actually I think fixedLengthStreamedAfter
is pretty cool and could make a good pair for enabled
:
withStreaming(enabled = true, fixedLengthStreamedAfter = 64.kb)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, I misunderstood about separating a method.
Apparently you suggested an overloaded version of withStreaming
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, sorry I if I wasn't clear. Basically we're trying to ensure our APIs are accessible from Java which, unfortunately, means saying no to default arguments.
I think, having few e2e tests that test only new mixed behavior are needed. I only tested this manually. I'll add some shortly. |
case class AggregateIfLessThan(size: StorageUnit) | ||
object AggregateIfLessThan { | ||
implicit val aggregateIfLessThan: Stack.Param[AggregateIfLessThan] = | ||
Stack.Param(AggregateIfLessThan(StorageUnit.zero)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this, an 8KB default may be a reasonable default here. What do people think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like 8kb.
*/ | ||
def withStreaming(enabled: Boolean): Server = | ||
configured(http.param.Streaming(enabled)) | ||
def withStreaming(enabled: Boolean, aggregateIfLessThen: StorageUnit = zero): Server = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Also what do you think of using AggregateIfLessThan.aggregateIfLessThan.default
instead of zero
finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http/package.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good @edio! Again, appreciate your patience with this!
*/ | ||
def withStreaming(enabled: Boolean): Server = | ||
configured(http.param.Streaming(enabled)) | ||
|
||
/** | ||
* Streaming allows applications to work with HTTP messages that have large |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love these docs! Ty!
case class FixedLengthStreamedAfter(size: StorageUnit) | ||
object FixedLengthStreamedAfter { | ||
implicit val fixedLengthStreamedAfter: Stack.Param[FixedLengthStreamedAfter] = | ||
Stack.Param(FixedLengthStreamedAfter(StorageUnit.zero)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luciferous talked about using 8k for a default value here. I liked that at first but maybe we're better off preserving the current "default" behavior and using 5.mb as a value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edio This thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for this change @edio
case class FixedLengthStreamedAfter(size: StorageUnit) | ||
object FixedLengthStreamedAfter { | ||
implicit val fixedLengthStreamedAfter: Stack.Param[FixedLengthStreamedAfter] = | ||
Stack.Param(FixedLengthStreamedAfter(StorageUnit.zero)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
A quick update: I've been busy with other stuff today. Still working on those tests. |
f817dcf
to
7ac232c
Compare
Problem Server.withStreaming(true) only enables streaming for requests with chunked encoding. Requests without Transfer-Enconding are still buffered up to max allowed request size. This is not desirable for some use cases twitter#538 Solution Introduce new stack parameter MinChunkSize which determines the maximum content length for the FixedLengthMessageAggregator in netty server pipeline. Default value for the new parameter is 0 which forces streaming of all messages (changes former behavior). Result Server-side: val server = Http.server .withStreaming(true) will stream messages both with and without Transfer-Encoding. To revert to previous behavior minChunkSize must be set to non-zero value val server = Http.server .withStreaming(true, minChunkSize = 5.megabytes)
Introduce overloaded version of withStreaming for Java compatibility Update scaladocs
7ac232c
to
13f1b7a
Compare
13f1b7a
to
6db98df
Compare
I added couple of tests to cover cases we discussed. Sorry for all those force pushes. @vkostyukov, github says, there's still some change requested by you, but I looked through all the threads, and seems, that everything is covered. Did I miss something? Let me know, if you want me to squash the changes. |
Again, thanks a ton for following through this! You don't need to squash - we'll do that on our end. One thing I believe we should change before proceeding is set a default value to 5.mb (the current default), instead of 0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this a lot!
Our oncall will pull this for internal testing shortly (unless they have any additional feedback).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I second @vkostyukov, thanks for the better docs!
Co-Authored-By: edio <[email protected]>
Thanks for thorough reviews, folks! It was a pleasure to work on this with you. @vkostyukov, I assumed, that the commit message will be set from PR description on squash. First commit message in PR became a bit inaccurate after all the fixes applied:
IDK if that's too late to fix. In any case, that's nothing critical I guess. |
Don't worry about that. It's not a big deal. |
When Linkerd receives a significantly large HTTP request with a set content-length, the underlying Finagle router service buffers the entire message into memory instead of streaming the HTTP message in smaller chunks. This can cause problems when a client is sending very large file uploads without using `Transfer-Encoding: Chunked` This PR pulls in a new Finagle param [(#740)](twitter/finagle#740) that forces a finagle server to receive large HTTP messages in smaller chunks when the HTTP messages content-length exceeds a certain content-length threshold. In addition, Linkerd exposes this Finagle param to allow for configurability. I was able to test this out in a docker environment where I sent a significantly large file (140MB) to Linkerd. Previously Linkerd would load buffer the entire message into memory and was easily visible in the JVM memory usage. After the change, the same request was issued again and we no longer observe the spike in memory. fixes #2188 Signed-off-by: Dennis Adjei-Baah <[email protected]>
Problem
Server.withStreaming(true) only enables streaming for requests with
chunked encoding. Requests without chunked encoding are still buffered
up to max allowed request size. This is not desirable for some use cases
#538
Solution
Introduce new stack parameter FixedLengthStreamedAfter which determines
the maximum content length for the FixedLengthMessageAggregator in netty
server pipeline.
Default value for the new parameter is 5.megabytes which forces
streaming of messages with body exceeding 5 MiB (changes former
behavior).
Result
Server-side:
will stream messages with Transfer-Encoding: chunked and messages
without the header if Content-Length is larger than 5242880.
To revert to previous behavior fixedLengthStreamedAfter must be set to
the value of maxRequestSize