Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finagle-http: allow streaming of fixed length messages #740

Closed
wants to merge 9 commits into from

Conversation

koiuo
Copy link
Contributor

@koiuo koiuo commented Oct 21, 2018

Problem

Server.withStreaming(true) only enables streaming for requests with
chunked encoding. Requests without chunked encoding are still buffered
up to max allowed request size. This is not desirable for some use cases

#538

Solution

Introduce new stack parameter FixedLengthStreamedAfter which determines
the maximum content length for the FixedLengthMessageAggregator in netty
server pipeline.

Default value for the new parameter is 5.megabytes which forces
streaming of messages with body exceeding 5 MiB (changes former
behavior).

Result

Server-side:

val server = Http.server
  .withStreaming(true)
  .withMaxRequestSize(50.megabytes)

will stream messages with Transfer-Encoding: chunked and messages
without the header if Content-Length is larger than 5242880.
To revert to previous behavior fixedLengthStreamedAfter must be set to
the value of maxRequestSize

val server = Http.server
  .withStreaming(true, fixedLengthStreamedAfter = 50.megabytes)
  .withMaxRequestSize(50.megabytes)

@koiuo koiuo force-pushed the feature/unchunked_streaming branch from dee3e4a to ebfdc52 Compare October 22, 2018 18:45
koiuo added a commit to koiuo/linkerd that referenced this pull request Oct 22, 2018
@koiuo koiuo force-pushed the feature/unchunked_streaming branch from ebfdc52 to 94eca45 Compare October 22, 2018 21:04
@luciferous
Copy link
Collaborator

luciferous commented Oct 23, 2018

This looks good to me.

I'm not sure how I feel about the name, but I can't think of something better right now. Also, I wonder if it's reasonable to invert this API, i.e., when withStreaming(true) we ALWAYS stream the message body, and add a parameter where if the message is under this size then we buffer it. What do you think?

// Always streams message bodies, but if we know the
// Content-Length is under 128KB, then we fully buffer.
Http.withStreaming(true, 128.kilobytes)

@koiuo
Copy link
Contributor Author

koiuo commented Oct 23, 2018

@luciferous
thanks for a quick feedback.

I agree, that the API is confusing. My main motivation was backward compatibility, and I'm eager to redo this if compatibility is not an issue.

If developers always decide between using content or reader basing on the value returned by isChunked, then it is safe to invert the API. I'd vote for doing this.

Regardless of whether we invert or not, I like your idea with optional parameter.

@@ -146,7 +147,7 @@ package object http {
// specific HttpServerExpectContinueHandler above.
pipeline.addLast(
"fixedLenAggregator",
new FixedLengthMessageAggregator(maxRequestSize, handleExpectContinue = false)
new FixedLengthMessageAggregator(maxRequestSize, dechunkThreshold, handleExpectContinue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the client side? Here is how it looks today:

// 8 KB is the size of the maxChunkSize parameter used in netty3,
       // which is where it stops attempting to aggregate messages that lack
       // a 'Transfer-Encoding: chunked' header.
       fn("fixedLenAggregator", new FixedLengthMessageAggregator(8.kilobytes))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, it seems we shouldn't probably use the maxRequestSize here at all and just introduce a separate parameter for it instead.

I like @luciferous' suggestion. And I would go with minChunkSize as a parameter name.

Http.server.withStreaming(true, minChunkSize = 128.kb)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no intention to implement client side, but I'm open doing this if you think we need that. I do not know, however, what is the behavior right now, and what it should be. Would need to investigate.

minChunkSize is a bit confusing for me.
In general, it is confusing, that finagle's chunk is different from http chunk. It took me some time to realize, that Request.isChunked has, strictly speaking, nothing to do with Transfer-Encoding header.
If you believe, that for experienced finagle users minChunkSize would clearly mean finagle's chunk, then let's do this name. Otherwise, I, as non-experienced finagle user, would prefer to see the parameter named smthng like streamingThreshold

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with minChunkSize for now, if we think of something better during the review we can always change it.

Copy link
Contributor

@vkostyukov vkostyukov Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change the client-side too as, otherwise, Http.client.withStreaming(true, 128.kb) would mean the minChunkSize is silently ignored.

Copy link
Contributor Author

@koiuo koiuo Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkostyukov, buffering indeed may happen, but not in FixedLengthMessageAggregator.
Aggregator's buffering is all-or-nothing: we either buffer all message into a huge buffer up to maxContentLength or we bypass the aggregator, and emit chunks of data as they arrive from the upstream handlers in pipeline (and in this case minChunkSize does not have any effect).

So for example if we have a sufficiently large minChunkSize (say, 128 KiB) that is used as a maxContentLength for FixedLengthMessageAggregator, then the consumer of Request.reader wil actually never read a buffer of size 128*1024. On contrary, 128 KiB will likely be maximal theoretical value the consumer can read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm convinced. Sorry for the confusion, I think I mis-understood what FixedLengthMessageAggregator does. bufferSize seems reasonable but, still, could be interpreted as "buffer my chunks pls".

Perhaps something as verbose as aggregateIfLessThan = 128.kb.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any better ideas. aggregateIfLessThan works for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luciferous
small clarification

bufferSize = 128.kilobytes: buffer up to 128.kilobytes; Reader.read will ALWAYS return buf where buf.length <= 128.kilobytes

if

  • bufferSizez is very small. For example bufferSize == 100.bytes
  • request has Transfer-Encoding: chunked or request Content-Length is > 100
    then Request.reader.read(128.kilobytes.inBytes) will likely read buf with buf.length > 100 (the value will be determined by upstream handlers in netty pipeline)

if

  • bufferSizez is very small. For example bufferSize == 100.bytes
  • request does NOT have Transfer-Encoding: chunked and request Content-Length is <= 100
    then Request.reader.read(128.kilobytes.inBytes) is guaranteed to read buf with buf.length <= 100

So, as you see, there are no guarantees at all.

This discussion makes me think, that maybe we need couple of tests to demonstrate that behavior, so it is easier to reason about the handler behavior?
In the meantime I will make an attempt to improve the scaladoc on the aggregator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm not sure I follow, but hopefully some test/docs will clarify. I'll think about it more too.

@@ -12,9 +12,18 @@ import io.netty.handler.codec.http._
*/
private[http] class FixedLengthMessageAggregator(
maxContentLength: StorageUnit,
handleExpectContinue: Boolean = true
dechunkThreshold: StorageUnit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep using a single StorageUnit parameter and set it to min between the two on the call-site?

Copy link
Contributor Author

@koiuo koiuo Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'm reworking this code right now and came to the same conclusion.

Initially I wasn't sure, if we can set maxContentLength to 0 and still retain all the features.
Now I looked more carefully in the code, and it seems, that the only case when we really aggregate something in buf is when there's a HttpMessage with following HttpContent.

All other cases only make sure, that the request, when it makes sense, has isChunked set to false and content is available as string or buf.

I wonder even, if it makes sense to eventually split FixedLengthMessageAggregator so it has more clear purpose: one class should handle actual aggregation, and the other one should handle all the quirks, like noContentResponse and messages without content-length and trasfer-encoding headers..

@luciferous
Copy link
Collaborator

I think let's go ahead with the Http.*.withStreaming(true, 128.kilobytes) API and note the behavioral change in CHANGELOG.rst.

@vkostyukov's suggestion of minChunkSize sounds good.

@koiuo koiuo force-pushed the feature/unchunked_streaming branch from 94eca45 to 8946f66 Compare October 23, 2018 18:28
@koiuo
Copy link
Contributor Author

koiuo commented Oct 24, 2018

I fixed review notes.

I need some time to figure out why tests fail if buffer size for the aggregator is 0.

I thought, it should work, but apparently I missed something. My guess atm, is that even when we deduced a message length to be zero, netty can still send some content which we attempt to buffer (and since buffer length is 0, aggregator returns error 413).

Or maybe some tests just have to be adjusted.

@codecov-io
Copy link

codecov-io commented Oct 24, 2018

Codecov Report

Merging #740 into develop will increase coverage by 0.64%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #740      +/-   ##
===========================================
+ Coverage    70.43%   71.07%   +0.64%     
===========================================
  Files          745      679      -66     
  Lines        23724    20710    -3014     
  Branches      1736     1540     -196     
===========================================
- Hits         16709    14719    -1990     
+ Misses        7015     5991    -1024
Impacted Files Coverage Δ
...y4/http/handler/FixedLengthMessageAggregator.scala 95.83% <ø> (ø) ⬆️
...cala/com/twitter/finagle/netty4/http/package.scala 95.31% <100%> (+0.15%) ⬆️
...http/src/main/scala/com/twitter/finagle/Http.scala 82.45% <100%> (+0.2%) ⬆️
.../scala/com/twitter/finagle/http/param/params.scala 100% <100%> (ø) ⬆️
...twitter/finagle/mux/lease/exp/RequestSnooper.scala 0% <0%> (-100%) ⬇️
...er/finagle/mux/stats/MuxCancelledCategorizer.scala 0% <0%> (-100%) ⬇️
...witter/finagle/netty4/decoder/DecoderHandler.scala 0% <0%> (-100%) ⬇️
...hreading/EventLoopGroupExecutionDelayTracker.scala 0% <0%> (-100%) ⬇️
...ter/finagle/mux/lease/exp/GenerationalRandom.scala 0% <0%> (-100%) ⬇️
...om/twitter/finagle/netty4/encoder/BufEncoder.scala 0% <0%> (-100%) ⬇️
... and 158 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f306467...6db98df. Read the comment docs.

@koiuo
Copy link
Contributor Author

koiuo commented Oct 24, 2018

I fixed the failing test. The issue was not in the code, but in the test itself. It assumed, that the incoming request is not chunked, hence its reader has all the content already, hence it is safe to read it synchronously.

Do we want to get back to the parameter name discussion and try to comeup with better name? Or do you think it's good as is? If yes, do you want me to squash all the commits into a single one before you merge?

@koiuo koiuo force-pushed the feature/unchunked_streaming branch from a197ecf to 3f65913 Compare October 24, 2018 18:14
*/
def withStreaming(enabled: Boolean): Server =
configured(http.param.Streaming(enabled))
def withStreaming(enabled: Boolean, aggregateIfLessThen: StorageUnit = zero): Server =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it a separate method. Java doesn't understand default arguments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Also what do you think of using AggregateIfLessThan.aggregateIfLessThan.default instead of zero

Copy link
Contributor Author

@koiuo koiuo Oct 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take this opportunity to offer once again different name for the property. I promise it's the last time I'm raising this :)

withAggregateIfLessThan
con: does not suggest in any way, that it is realted to streaming
con: while it is very verbose and explicit, it is almost too explicit, delusionally explicit.
It explicitly says, what the framework will do. And even mentions some condition. But not all conditions are mentioned. That's why it is delusional.

Few alternatives

  • withStreamingThreshold(64.kilobytes)
    pro: suggests that it is related to streaming
    pro: threshold is a very general term, I'd decide to read docs to know how behavior changes once threshold is reached and wouldn't be fooled by an overly explicit name
  • withStreamingBuffer(64.kilobytes)
    pro: suggests that it is realted to streaming
    pro: buffer is a very generic term, so I'd probably decide to read docs and wouldn't be fooled by an overly explicit name
    con: even more delusional than withAggregateIfLessThan
  • withContentLengthStreamedAfter(64.kilobytes)
    pro: suggest, that it is realted to streaming (if you typed Str in IDE you'd likely see this among suggestions)
    pro: suggests that it may be realted to Content-Length (which is the truth, we only aggregate, if there's content-length header). IMO it is the least delusional name.
    con: not less verbose, than withAggregateIfLessThan, but, imo, much more readable
  • withFixedLengthStreamedAfter(64.kilobytes)
    pro: related to streaming
    pro: suggests, that it is related to fixed length messages
    con: develop must know what is a fixed length message
    con: verbose
  • withStreamingAfter(contentLength: StorageUnit)
    pro: related to streaming
    pro: related to content length
    con: contentLength is only present in method definition and only in original sources (won't work on decompiled code)

Copy link
Contributor

@vkostyukov vkostyukov Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like all of those names but I'm still in the "we should group this new config with withStreaming" camp. As the new value only takes affect when streaming is enabled, we might want to keep them together (one configuration entry and perhaps one stack param).

Actually I think fixedLengthStreamedAfter is pretty cool and could make a good pair for enabled:

withStreaming(enabled = true, fixedLengthStreamedAfter = 64.kb)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I misunderstood about separating a method.
Apparently you suggested an overloaded version of withStreaming...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, sorry I if I wasn't clear. Basically we're trying to ensure our APIs are accessible from Java which, unfortunately, means saying no to default arguments.

@koiuo
Copy link
Contributor Author

koiuo commented Oct 24, 2018

I think, having few e2e tests that test only new mixed behavior are needed. I only tested this manually. I'll add some shortly.

case class AggregateIfLessThan(size: StorageUnit)
object AggregateIfLessThan {
implicit val aggregateIfLessThan: Stack.Param[AggregateIfLessThan] =
Stack.Param(AggregateIfLessThan(StorageUnit.zero))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about this, an 8KB default may be a reasonable default here. What do people think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like 8kb.

*/
def withStreaming(enabled: Boolean): Server =
configured(http.param.Streaming(enabled))
def withStreaming(enabled: Boolean, aggregateIfLessThen: StorageUnit = zero): Server =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Also what do you think of using AggregateIfLessThan.aggregateIfLessThan.default instead of zero

Copy link
Contributor

@vkostyukov vkostyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good @edio! Again, appreciate your patience with this!

*/
def withStreaming(enabled: Boolean): Server =
configured(http.param.Streaming(enabled))

/**
* Streaming allows applications to work with HTTP messages that have large
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love these docs! Ty!

case class FixedLengthStreamedAfter(size: StorageUnit)
object FixedLengthStreamedAfter {
implicit val fixedLengthStreamedAfter: Stack.Param[FixedLengthStreamedAfter] =
Stack.Param(FixedLengthStreamedAfter(StorageUnit.zero))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luciferous talked about using 8k for a default value here. I liked that at first but maybe we're better off preserving the current "default" behavior and using 5.mb as a value?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edio This thread.

Copy link
Collaborator

@luciferous luciferous left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for this change @edio

case class FixedLengthStreamedAfter(size: StorageUnit)
object FixedLengthStreamedAfter {
implicit val fixedLengthStreamedAfter: Stack.Param[FixedLengthStreamedAfter] =
Stack.Param(FixedLengthStreamedAfter(StorageUnit.zero))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@koiuo
Copy link
Contributor Author

koiuo commented Oct 25, 2018

A quick update: I've been busy with other stuff today. Still working on those tests.

@koiuo koiuo force-pushed the feature/unchunked_streaming branch 3 times, most recently from f817dcf to 7ac232c Compare October 25, 2018 21:55
Problem

Server.withStreaming(true) only enables streaming for requests with
chunked encoding. Requests without Transfer-Enconding are still buffered
up to max allowed request size. This is not desirable for some use cases

twitter#538

Solution

Introduce new stack parameter MinChunkSize which determines the maximum
content length for the FixedLengthMessageAggregator in netty server
pipeline.

Default value for the new parameter is 0 which forces streaming of all
messages (changes former behavior).

Result

Server-side:

  val server = Http.server
    .withStreaming(true)

will stream messages both with and without Transfer-Encoding. To revert
to previous behavior minChunkSize must be set to non-zero value

  val server = Http.server
    .withStreaming(true, minChunkSize = 5.megabytes)
Introduce overloaded version of withStreaming for Java compatibility
Update scaladocs
@koiuo koiuo force-pushed the feature/unchunked_streaming branch from 7ac232c to 13f1b7a Compare October 25, 2018 21:58
@koiuo koiuo force-pushed the feature/unchunked_streaming branch from 13f1b7a to 6db98df Compare October 25, 2018 22:10
@koiuo koiuo changed the title finagle-http: introduce withDechunkThreshold finagle-http: allow streaming of fixed length messages Oct 25, 2018
@koiuo
Copy link
Contributor Author

koiuo commented Oct 25, 2018

I added couple of tests to cover cases we discussed. Sorry for all those force pushes.

@vkostyukov, github says, there's still some change requested by you, but I looked through all the threads, and seems, that everything is covered. Did I miss something?

Let me know, if you want me to squash the changes.

@vkostyukov
Copy link
Contributor

I added couple of tests to cover cases we discussed. Sorry for all those force pushes.

@vkostyukov, github says, there's still some change requested by you, but I looked through all the threads, and seems, that everything is covered. Did I miss something?

Let me know, if you want me to squash the changes.

Again, thanks a ton for following through this! You don't need to squash - we'll do that on our end.

One thing I believe we should change before proceeding is set a default value to 5.mb (the current default), instead of 0.

Copy link
Contributor

@vkostyukov vkostyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot!

Our oncall will pull this for internal testing shortly (unless they have any additional feedback).

Copy link
Contributor

@bryce-anderson bryce-anderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @vkostyukov, thanks for the better docs!

finagle-http/src/main/scala/com/twitter/finagle/Http.scala Outdated Show resolved Hide resolved
@vkostyukov
Copy link
Contributor

This was merged internally and made it back to Github: a3094f3

Thank you again @edio!

@vkostyukov vkostyukov closed this Oct 27, 2018
@koiuo
Copy link
Contributor Author

koiuo commented Oct 27, 2018

Thanks for thorough reviews, folks! It was a pleasure to work on this with you.

@vkostyukov, I assumed, that the commit message will be set from PR description on squash. First commit message in PR became a bit inaccurate after all the fixes applied:

Default value for the new parameter is 0 which forces streaming of all messages (changes former behavior).

IDK if that's too late to fix. In any case, that's nothing critical I guess.

@vkostyukov
Copy link
Contributor

IDK if that's too late to fix. In any case, that's nothing critical I guess.

Don't worry about that. It's not a big deal.

dadjeibaah pushed a commit to linkerd/linkerd that referenced this pull request Dec 17, 2018
When Linkerd receives a significantly large HTTP request with a set content-length, the underlying Finagle router service buffers the entire message into memory instead of streaming the HTTP message in smaller chunks. This can cause problems when a client is sending very large file uploads without using `Transfer-Encoding: Chunked`

This PR pulls in a new Finagle param [(#740)](twitter/finagle#740) that forces a finagle server to receive large HTTP messages in smaller chunks when the HTTP messages content-length exceeds a certain content-length threshold. In addition, Linkerd exposes this Finagle param to allow for configurability.  I was able to test this out in a docker environment where I sent a significantly large file (140MB) to Linkerd. Previously Linkerd would load buffer the entire message into memory and was easily visible in the JVM memory usage. After the change, the same request was issued again and we no longer observe the spike in memory.

fixes #2188

Signed-off-by: Dennis Adjei-Baah <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

None yet

5 participants