-
Notifications
You must be signed in to change notification settings - Fork 504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional logging to track mesh clients interacting with Namerd #2277
Changes from all commits
ec9aec1
ca8d07d
f4946bc
c60346b
bee6dbb
44cf2f1
6082418
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package com.twitter.finagle.buoyant.h2 | ||
|
||
import com.twitter.finagle | ||
import com.twitter.finagle._ | ||
import com.twitter.util.Future | ||
|
||
object H2HeaderInjector { | ||
def module(headerMap: Map[String, String]): Stackable[ServiceFactory[Request, Response]] = { | ||
new finagle.Stack.Module0[ServiceFactory[Request, Response]] { | ||
override def make(next: ServiceFactory[Request, Response]): ServiceFactory[Request, Response] = | ||
new HeaderInjector(headerMap).andThen(next) | ||
|
||
override def role: Stack.Role = Stack.Role("H2HeaderInjector") | ||
override def description: String = "Add arbitrary headers to H2 requests" | ||
} | ||
} | ||
} | ||
|
||
class HeaderInjector(injectHeaders: Map[String, String]) extends SimpleFilter[Request, Response] { | ||
override def apply(request: Request, service: Service[Request, Response]): Future[Response] = { | ||
val req = request.dup() | ||
injectHeaders.foreach { kvPair => | ||
req.headers.add(kvPair._1, kvPair._2); () | ||
} | ||
service(req) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
package io.buoyant.grpc.runtime | ||
|
||
import com.twitter.finagle.{Failure, Service => FinagleService} | ||
import com.twitter.finagle.{Service => FinagleService} | ||
import com.twitter.finagle.buoyant.h2 | ||
import com.twitter.logging.Logger | ||
import com.twitter.util.{Future, Return, Throw, Try} | ||
|
||
object ServerDispatcher { | ||
|
||
val log = Logger.get(this.getClass.getName) | ||
trait Service { | ||
def name: String | ||
def rpcs: Seq[Rpc] | ||
|
@@ -16,7 +18,6 @@ object ServerDispatcher { | |
} | ||
|
||
object Rpc { | ||
|
||
class UnaryToUnary[Req, Rsp]( | ||
val name: String, | ||
serve: Req => Future[Rsp], | ||
|
@@ -38,11 +39,13 @@ object ServerDispatcher { | |
rspCodec: Codec[Rsp] | ||
) extends ServerDispatcher.Rpc { | ||
|
||
override def apply(req: h2.Request): Future[h2.Response] = | ||
acceptUnary(reqCodec, req).map(streamResponse) | ||
override def apply(req: h2.Request): Future[h2.Response] = { | ||
val headersMap = req.headers.toSeq | ||
acceptUnary(reqCodec, req).map(req => streamResponse(req, headersMap)) | ||
} | ||
|
||
private[this] val streamResponse: Req => h2.Response = | ||
req => respondStreaming(rspCodec, serve(req)) | ||
private[this] val streamResponse: (Req, Seq[(String, String)]) => h2.Response = | ||
(req, reqMeta) => respondStreaming(rspCodec, serve(req), reqMeta) | ||
} | ||
|
||
class StreamToUnary[Req, Rsp]( | ||
|
@@ -68,7 +71,8 @@ object ServerDispatcher { | |
|
||
override def apply(req: h2.Request): Future[h2.Response] = { | ||
val reqs = acceptStreaming(reqCodec, req) | ||
val rsp = respondStreaming(rspCodec, serve(reqs)) | ||
val headersMap = req.headers.toSeq | ||
val rsp = respondStreaming(rspCodec, serve(reqs), headersMap) | ||
Future.value(rsp) | ||
} | ||
} | ||
|
@@ -99,11 +103,12 @@ object ServerDispatcher { | |
h2.Response(h2.Status.Ok, frames) | ||
} | ||
|
||
private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp]): h2.Response = { | ||
private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp], reqMeta: Seq[(String, String)]): h2.Response = { | ||
val frames = h2.Stream() | ||
def loop(): Future[Unit] = | ||
msgs.recv().transform { | ||
case Return(Stream.Releasable(s, release)) => | ||
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This says response metadata but it's printing the request headers, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the intent here? Each time there's a response message, the request headers get printed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct since the request headers may contain information useful for the particular request. |
||
val buf = codec.encodeGrpcMessage(s) | ||
val data = h2.Frame.Data(buf, eos = false, release) | ||
frames.write(data).before(loop()) | ||
|
@@ -113,6 +118,7 @@ object ServerDispatcher { | |
case s: GrpcStatus => s | ||
case e => GrpcStatus.Internal(e.getMessage) | ||
} | ||
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") | ||
frames.write(status.toTrailers) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package io.buoyant.interpreter | |
import com.fasterxml.jackson.annotation.JsonIgnore | ||
import com.twitter.conversions.DurationOps._ | ||
import com.twitter.finagle._ | ||
import com.twitter.finagle.buoyant.h2.H2HeaderInjector | ||
import com.twitter.finagle.buoyant.{H2, TlsClientConfig} | ||
import com.twitter.finagle.liveness.FailureDetector | ||
import com.twitter.finagle.liveness.FailureDetector.ThresholdConfig | ||
|
@@ -12,6 +13,7 @@ import com.twitter.finagle.util.DefaultTimer | |
import com.twitter.logging.Logger | ||
import io.buoyant.interpreter.mesh.Client | ||
import io.buoyant.namer.{InterpreterConfig, InterpreterInitializer} | ||
import java.util.UUID.randomUUID | ||
import scala.util.control.NoStackTrace | ||
|
||
/** | ||
|
@@ -67,30 +69,31 @@ case class MeshInterpreterConfig( | |
failureThreshold: Option[FailureThresholdConfig] | ||
) extends InterpreterConfig { | ||
import MeshInterpreterConfig._ | ||
|
||
/** | ||
* Construct a namer. | ||
*/ | ||
@JsonIgnore | ||
def newInterpreter(params: Stack.Params): NameInterpreter = { | ||
val MeshClientHeader = "l5d-ctx-mesh-client" | ||
val name = dst match { | ||
case None => throw new IllegalArgumentException("`dst` is a required field") with NoStackTrace | ||
case Some(dst) => Name.Path(dst) | ||
} | ||
val label = MeshInterpreterInitializer.configId | ||
|
||
val clientId = randomUUID().toString | ||
val Retry(baseRetry, maxRetry) = retry.getOrElse(defaultRetry) | ||
val backoffs = Backoff.exponentialJittered(baseRetry.seconds, maxRetry.seconds) | ||
val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty) | ||
val failureThresholdParams = failureThreshold.map(_.params).getOrElse(Stack.Params.empty) | ||
val headerIdentity = Map(MeshClientHeader -> clientId) | ||
|
||
val client = H2.client | ||
val client = H2.client.withStack(H2HeaderInjector.module(headerIdentity) +: _) | ||
.withParams(H2.client.params ++ tlsParams ++ failureThresholdParams ++ params) | ||
.newService(name, label) | ||
|
||
root.getOrElse(DefaultRoot) match { | ||
case [email protected](_) => | ||
Client(r, client, backoffs, DefaultTimer) | ||
Client(clientId, r, client, backoffs, DefaultTimer) | ||
|
||
case r => | ||
val msg = s"io.l5d.mesh: `root` may only contain a single path element (for now): ${r.show}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary? aren't headers mutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember from my last run-in with H2 request mangling that I had to
dup()
this for some reason, alas, I do not remember. I think we don't need this.