diff --git a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala new file mode 100644 index 0000000000..a7113c7c17 --- /dev/null +++ b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala @@ -0,0 +1,27 @@ +package com.twitter.finagle.buoyant.h2 + +import com.twitter.finagle +import com.twitter.finagle._ +import com.twitter.util.Future + +object H2HeaderInjector { + def module(headerMap: Map[String, String]): Stackable[ServiceFactory[Request, Response]] = { + new finagle.Stack.Module0[ServiceFactory[Request, Response]] { + override def make(next: ServiceFactory[Request, Response]): ServiceFactory[Request, Response] = + new HeaderInjector(headerMap).andThen(next) + + override def role: Stack.Role = Stack.Role("H2HeaderInjector") + override def description: String = "Add arbitrary headers to H2 requests" + } + } +} + +class HeaderInjector(injectHeaders: Map[String, String]) extends SimpleFilter[Request, Response] { + override def apply(request: Request, service: Service[Request, Response]): Future[Response] = { + val req = request.dup() + injectHeaders.foreach { kvPair => + req.headers.add(kvPair._1, kvPair._2); () + } + service(req) + } +} diff --git a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala index 08e27a6fcf..824d47291c 100644 --- a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala +++ b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala @@ -90,7 +90,7 @@ private[netty4] object Netty4H2Writer { override protected[this] def write(frame: Http2Frame): Future[Unit] = { log.trace("[L:%s R:%s] write: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name) val f = trans.write(frame).rescue(wrapWriteException) - f.respond(v => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, v)) + f.respond(_ => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, frame)) f } diff --git a/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala b/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala index bbec2b1c6f..f4619e73c0 100644 --- a/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala +++ b/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala @@ -1,11 +1,13 @@ package io.buoyant.grpc.runtime -import com.twitter.finagle.{Failure, Service => FinagleService} +import com.twitter.finagle.{Service => FinagleService} import com.twitter.finagle.buoyant.h2 +import com.twitter.logging.Logger import com.twitter.util.{Future, Return, Throw, Try} object ServerDispatcher { + val log = Logger.get(this.getClass.getName) trait Service { def name: String def rpcs: Seq[Rpc] @@ -16,7 +18,6 @@ object ServerDispatcher { } object Rpc { - class UnaryToUnary[Req, Rsp]( val name: String, serve: Req => Future[Rsp], @@ -38,11 +39,13 @@ object ServerDispatcher { rspCodec: Codec[Rsp] ) extends ServerDispatcher.Rpc { - override def apply(req: h2.Request): Future[h2.Response] = - acceptUnary(reqCodec, req).map(streamResponse) + override def apply(req: h2.Request): Future[h2.Response] = { + val headersMap = req.headers.toSeq + acceptUnary(reqCodec, req).map(req => streamResponse(req, headersMap)) + } - private[this] val streamResponse: Req => h2.Response = - req => respondStreaming(rspCodec, serve(req)) + private[this] val streamResponse: (Req, Seq[(String, String)]) => h2.Response = + (req, reqMeta) => respondStreaming(rspCodec, serve(req), reqMeta) } class StreamToUnary[Req, Rsp]( @@ -68,7 +71,8 @@ object ServerDispatcher { override def apply(req: h2.Request): Future[h2.Response] = { val reqs = acceptStreaming(reqCodec, req) - val rsp = respondStreaming(rspCodec, serve(reqs)) + val headersMap = req.headers.toSeq + val rsp = respondStreaming(rspCodec, serve(reqs), headersMap) Future.value(rsp) } } @@ -99,11 +103,12 @@ object ServerDispatcher { h2.Response(h2.Status.Ok, frames) } - private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp]): h2.Response = { + private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp], reqMeta: Seq[(String, String)]): h2.Response = { val frames = h2.Stream() def loop(): Future[Unit] = msgs.recv().transform { case Return(Stream.Releasable(s, release)) => + log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") val buf = codec.encodeGrpcMessage(s) val data = h2.Frame.Data(buf, eos = false, release) frames.write(data).before(loop()) @@ -113,6 +118,7 @@ object ServerDispatcher { case s: GrpcStatus => s case e => GrpcStatus.Internal(e.getMessage) } + log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") frames.write(status.toTrailers) } diff --git a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala index 0575c8a14a..d1c51e08aa 100644 --- a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala +++ b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala @@ -3,6 +3,7 @@ package io.buoyant.interpreter import com.fasterxml.jackson.annotation.JsonIgnore import com.twitter.conversions.DurationOps._ import com.twitter.finagle._ +import com.twitter.finagle.buoyant.h2.H2HeaderInjector import com.twitter.finagle.buoyant.{H2, TlsClientConfig} import com.twitter.finagle.liveness.FailureDetector import com.twitter.finagle.liveness.FailureDetector.ThresholdConfig @@ -12,6 +13,7 @@ import com.twitter.finagle.util.DefaultTimer import com.twitter.logging.Logger import io.buoyant.interpreter.mesh.Client import io.buoyant.namer.{InterpreterConfig, InterpreterInitializer} +import java.util.UUID.randomUUID import scala.util.control.NoStackTrace /** @@ -67,30 +69,31 @@ case class MeshInterpreterConfig( failureThreshold: Option[FailureThresholdConfig] ) extends InterpreterConfig { import MeshInterpreterConfig._ - /** * Construct a namer. */ @JsonIgnore def newInterpreter(params: Stack.Params): NameInterpreter = { + val MeshClientHeader = "l5d-ctx-mesh-client" val name = dst match { case None => throw new IllegalArgumentException("`dst` is a required field") with NoStackTrace case Some(dst) => Name.Path(dst) } val label = MeshInterpreterInitializer.configId - + val clientId = randomUUID().toString val Retry(baseRetry, maxRetry) = retry.getOrElse(defaultRetry) val backoffs = Backoff.exponentialJittered(baseRetry.seconds, maxRetry.seconds) val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty) val failureThresholdParams = failureThreshold.map(_.params).getOrElse(Stack.Params.empty) + val headerIdentity = Map(MeshClientHeader -> clientId) - val client = H2.client + val client = H2.client.withStack(H2HeaderInjector.module(headerIdentity) +: _) .withParams(H2.client.params ++ tlsParams ++ failureThresholdParams ++ params) .newService(name, label) root.getOrElse(DefaultRoot) match { case r@Path.Utf8(_) => - Client(r, client, backoffs, DefaultTimer) + Client(clientId, r, client, backoffs, DefaultTimer) case r => val msg = s"io.l5d.mesh: `root` may only contain a single path element (for now): ${r.show}" diff --git a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala index 8296aea66c..8fe97691c8 100644 --- a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala +++ b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala @@ -1,6 +1,6 @@ package io.buoyant.interpreter.mesh -import com.twitter.finagle._ +import com.twitter.finagle.{Addr, _} import com.twitter.finagle.buoyant.h2 import com.twitter.finagle.buoyant.h2.Reset import com.twitter.finagle.http.{MediaType, Request, Response} @@ -14,11 +14,13 @@ import io.buoyant.namer.{DelegateTree, Delegator, InstrumentedActivity, Instrume import io.linkerd.mesh import io.linkerd.mesh.Converters._ import java.net.{InetAddress, InetSocketAddress} +import com.twitter.logging.Logger import scala.util.control.{NoStackTrace, NonFatal} object Client { def apply( + clientId: String, root: Path, service: Service[h2.Request, h2.Response], backoffs: scala.Stream[Duration], @@ -27,10 +29,11 @@ object Client { val interpreter = new mesh.Interpreter.Client(service) val resolver = new mesh.Resolver.Client(service) val delegator = new mesh.Delegator.Client(service) - new Impl(root, interpreter, resolver, delegator, backoffs, timer) + new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer) } def apply( + clientId: String, root: Path, interpreter: mesh.Interpreter, resolver: mesh.Resolver, @@ -38,7 +41,7 @@ object Client { backoffs: scala.Stream[Duration], timer: Timer ): NameInterpreter with Delegator = - new Impl(root, interpreter, resolver, delegator, backoffs, timer) + new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer) private[this] val _releaseNop = () => Future.Unit private[this] val _rescueUnit: PartialFunction[Throwable, Future[Unit]] = { @@ -46,6 +49,7 @@ object Client { } private[this] class Impl( + clientId: String, root: Path, interpreter: mesh.Interpreter, resolver: mesh.Resolver, @@ -54,6 +58,8 @@ object Client { timer: Timer ) extends NameInterpreter with Delegator with Admin.WithHandlers { + val log = Logger.get(this.getClass.getName) + private case class InstrumentedBind( act: InstrumentedActivity[NameTree[Name.Bound]], stream: StreamState[mesh.BindReq, mesh.BoundTreeRsp] @@ -89,12 +95,14 @@ object Client { case Some(bind) => bind.act.underlying case None => val streamState = new StreamState[mesh.BindReq, mesh.BoundTreeRsp] + val logWithClientId: String => Unit = log.trace(s"Mesh client %s BoundTreeRsp stream state: %s", clientId, _) val open = () => { val req = mkBindReq(root, path, dtab) + log.trace(s"Mesh client [$clientId] mesh bind request: Root: ${root.show}, Path: ${path.show}, Dtab: ${dtab.show}") streamState.recordApiCall(req) interpreter.streamBoundTree(req) } - val bind = streamActivity(open, decodeBoundTree, backoffs, timer, streamState) + val bind = streamActivity(open, decodeBoundTree, backoffs, timer, streamState, logWithClientId) bindCache += (key -> InstrumentedBind(bind, streamState)) bind.underlying } @@ -110,12 +118,14 @@ object Client { dtabCache.act.underlying } else { val streamState = new StreamState[mesh.DtabReq, mesh.DtabRsp] + val logWithClientId: String => Unit = log.trace(s"Mesh client %s DtabRsp stream state: %s", clientId, _) val open = () => { val req = mkDtabReq(root) + log.trace(s"Mesh client [$clientId] mesh dtab request: Path: ${root.show}") streamState.recordApiCall(req) delegator.streamDtab(req) } - streamActivity(open, decodeDtab, backoffs, timer, streamState).underlying + streamActivity(open, decodeDtab, backoffs, timer, streamState, logWithClientId).underlying } } } @@ -125,6 +135,7 @@ object Client { tree: NameTree[Name.Path] ): Future[DelegateTree[Name.Bound]] = { val req = mkDelegateTreeReq(root, dtab, tree) + log.trace(s"Mesh client [$clientId] delegate request: Root: ${root.show} Dtab: ${dtab.show}") delegator.getDelegateTree(req).flatMap { delegateTree => decodeDelegateTree(delegateTree) match { case Some(decoded) => Future.value(decoded) @@ -141,12 +152,14 @@ object Client { case Some(resolution) => resolution.act.underlying case None => val streamState = new StreamState[mesh.ReplicasReq, mesh.Replicas] + val logWithClientId: String => Unit = log.trace(s"Mesh client %s Replicas stream state: %s", clientId, _) val open = () => { val req = mkReplicasReq(id) + log.trace(s"Mesh client [$clientId] replicas request: client name: ${id.show}") streamState.recordApiCall(req) resolver.streamReplicas(req) } - val resolution = streamVar(Addr.Pending, open, replicasToAddr, backoffs, timer, streamState) + val resolution = streamVar(Addr.Pending, open, replicasToAddr, backoffs, timer, streamState, logWithClientId) resolveCache += (id -> InstrumentedResolve(resolution, streamState)) resolution.underlying } @@ -240,7 +253,8 @@ object Client { toT: Try[S] => Option[T], backoffs0: scala.Stream[Duration], timer: Timer, - streamState: StreamState[_, S] + streamState: StreamState[_, S], + logWithClientId: String => Unit ): InstrumentedVar[T] = InstrumentedVar[T](init) { state => implicit val timer0 = timer @@ -261,10 +275,12 @@ object Client { else rsps.recv().respond { rep => streamState.recordResponse(rep.map(_.value)) }.transform { - case Throw(_) if closed => + case Throw(e) if closed => + logWithClientId(e.getMessage) releasePrior().rescue(_rescueUnit) case Throw(NonFatal(e)) => + logWithClientId(e.getMessage) val releasePriorNoError = () => releasePrior().rescue(_rescueUnit) backoffs match { case wait #:: moreBackoffs => @@ -275,13 +291,16 @@ object Client { } case Throw(e) => // fatal + logWithClientId(e.getMessage) Future.exception(e) case Return(Stream.Releasable(s, release)) => toT(Return(s)) match { case None => + logWithClientId("None") release().before(loop(rsps, backoffs0, releasePrior)) case Some(t) => + logWithClientId(t.toString) state() = t releasePrior().before(loop(rsps, backoffs0, release)) } @@ -311,7 +330,8 @@ object Client { toT: S => Option[T], bos: scala.Stream[Duration], timer: Timer, - state: StreamState[_, S] + state: StreamState[_, S], + logWithClientId: String => Unit ): InstrumentedActivity[T] = { val toState: Try[S] => Option[Activity.State[T]] = { case Throw(e) => Some(Activity.Failed(e)) @@ -321,7 +341,7 @@ object Client { case Some(t) => Some(Activity.Ok(t)) } } - new InstrumentedActivity(streamVar(Activity.Pending, open, toState, bos, timer, state)) + new InstrumentedActivity(streamVar(Activity.Pending, open, toState, bos, timer, state, logWithClientId)) } private[this] val decodeDtab: mesh.DtabRsp => Option[Dtab] = { diff --git a/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala b/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala index d90c5e45b7..2fc061b6e1 100644 --- a/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala +++ b/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala @@ -38,7 +38,7 @@ class ClientTest extends FunSuite { addrRsps } } - Client(Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer) + Client("", Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer) } val act = client.bind(Dtab.read("/stuff => /mas"), Path.read("/some/name")) diff --git a/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala b/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala index 23b23dc858..6de419037e 100644 --- a/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala +++ b/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala @@ -185,7 +185,10 @@ private[k8s] abstract class Watchable[O <: KubeObject: TypeReference, W <: Watch case Some((event, ws)) => import Ordering.Implicits._ - watchState.foreach(_.recordStreamData(event)) + watchState.foreach { state => + state.recordStreamData(event) + log.trace("k8s event on '%s' received '%s", watchPath, event) + } // Register the update only if its resource version is larger than or equal to the largest version // seen so far. if (largestEvent.forall(_ <= event)) {