Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finagle-http: allow streaming of fixed length messages #740

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
Add e2e tests for new server behavior
  • Loading branch information
koiuo committed Oct 25, 2018
commit 6db98df18b1bb2fd302f52b5e9c5f3fc1c70f9d5
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.twitter.finagle.http.Response;
import com.twitter.finagle.param.Label;
import com.twitter.util.Future;
import com.twitter.util.StorageUnit;

public final class HttpServerTest {

Expand Down Expand Up @@ -52,6 +53,7 @@ public static void main(String[] args) {
.withAdmissionControl().noDeadlines()
.withAdmissionControl().darkModeDeadlines()
.withCompressionLevel(2)
.withStreaming(true, StorageUnit.fromMegabytes(1))
.configured(new Label("test").mk())
.withDecompression(true)
.withHttp2();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.twitter.finagle.http

import com.twitter.finagle.{ListeningServer, Service}
import com.twitter.conversions.storage._
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.io.Buf
import com.twitter.io.ReaderDiscardedException
import com.twitter.util.{Future, Promise, Return, Throw}
import com.twitter.finagle.{ListeningServer, Service}
import com.twitter.io.{Buf, Reader, ReaderDiscardedException}
import com.twitter.util.{Future, Promise, Return, StorageUnit, Throw}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicBoolean
import org.scalatest.prop.TableDrivenPropertyChecks._

abstract class AbstractHttp1EndToEndTest extends AbstractEndToEndTest {

Expand Down Expand Up @@ -237,6 +238,77 @@ abstract class AbstractHttp1EndToEndTest extends AbstractEndToEndTest {
await(server.close())
}

test(s"streaming server does not stream sufficiently small fixed length messages") {
val fixedLengthStreamedAfter: StorageUnit = 4.bytes
Table(
("body", "expectChunked"),
("", false),
("hal", false),
("hall", false),
("hello", true)
).forEvery { (body, expectChunked) =>
val receivedRequestFuture: Promise[Request] = Promise()
// given
val svc = Service.mk[Request, Response] { req =>
receivedRequestFuture.setValue(req)
Future.value(Response(req))
}

val server = serverImpl()
.withStreaming(true, fixedLengthStreamedAfter)
.serve("localhost:*", svc)

val addr = server.boundAddress.asInstanceOf[InetSocketAddress]
val client = clientImpl().newService(s"${addr.getHostName}:${addr.getPort}", "client")
initClient(client)

val req = Request()
req.contentString = body
req.headerMap.put("Content-Length", body.length.toString)

// when
client(req)

// then
val receivedRequest = await(receivedRequestFuture)
assert(receivedRequest.isChunked == expectChunked)
val receivedBody = await(Reader.readAll(receivedRequest.reader))
assert(receivedBody == Buf.Utf8(body))

await(server.close())
await(client.close())
}
}

test(s"streaming server won't accept fixed length messages that exceed maxRequestSize") {
// given
val svc = Service.mk[Request, Response] { req =>
Future.value(Response(req))
}

val server = serverImpl()
.withStreaming(true, 1.gigabyte)
.withMaxRequestSize(4.bytes)
.serve("localhost:*", svc)

val addr = server.boundAddress.asInstanceOf[InetSocketAddress]
val client = clientImpl().newService(s"${addr.getHostName}:${addr.getPort}", "client")
initClient(client)

val req = Request()
req.contentString = "hello"
req.headerMap.put("Content-Length", "5")

// when
val resp = await(client(req))

// then
assert(resp.statusCode == 413)

await(server.close())
await(client.close())
}

run(multiplePipelines(implName, _))(nonStreamingConnect)
run(multiplePipelines(implName + "(streaming)", _))(streamingConnect)

Expand Down