-
Notifications
You must be signed in to change notification settings - Fork 791
/
AkkaHttpTestAsyncWebServer.scala
77 lines (71 loc) · 2.66 KB
/
AkkaHttpTestAsyncWebServer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.HttpMethods.GET
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import groovy.lang.Closure
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
object AkkaHttpTestAsyncWebServer {
implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val asyncHandler: HttpRequest => Future[HttpResponse] = {
case HttpRequest(GET, uri: Uri, _, _, _) =>
Future {
val endpoint =
HttpServerTest.ServerEndpoint.forPath(uri.path.toString())
HttpServerTest.controller(
endpoint,
new Closure[HttpResponse](()) {
def doCall(): HttpResponse = {
val resp = HttpResponse(status =
endpoint.getStatus
) //.withHeaders(headers.Type)resp.contentType = "text/plain"
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
case ERROR => resp.withEntity(endpoint.getBody)
case EXCEPTION => throw new Exception(endpoint.getBody)
case _ =>
HttpResponse(status = NOT_FOUND.getStatus)
.withEntity(NOT_FOUND.getBody)
}
}
}
)
}
}
private var binding: ServerBinding = _
def start(port: Int): Unit = synchronized {
if (null == binding) {
import scala.concurrent.duration._
binding = Await.result(
Http().bindAndHandleAsync(asyncHandler, "localhost", port),
10 seconds
)
}
}
def stop(): Unit = synchronized {
if (null != binding) {
binding.unbind()
system.terminate()
binding = null
}
}
}