From ec9aec1328760018b29d902d1873d7127c3f4037 Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Wed, 22 May 2019 13:43:05 -0700 Subject: [PATCH 1/6] add trace logging for Linkerd's mesh client Signed-off-by: Dennis Adjei-Baah --- .../io/buoyant/interpreter/mesh/Client.scala | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala index 8296aea66c..b59e5e8bac 100644 --- a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala +++ b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala @@ -1,6 +1,6 @@ package io.buoyant.interpreter.mesh -import com.twitter.finagle._ +import com.twitter.finagle.{Addr, _} import com.twitter.finagle.buoyant.h2 import com.twitter.finagle.buoyant.h2.Reset import com.twitter.finagle.http.{MediaType, Request, Response} @@ -14,6 +14,7 @@ import io.buoyant.namer.{DelegateTree, Delegator, InstrumentedActivity, Instrume import io.linkerd.mesh import io.linkerd.mesh.Converters._ import java.net.{InetAddress, InetSocketAddress} +import com.twitter.logging.Logger import scala.util.control.{NoStackTrace, NonFatal} object Client { @@ -54,6 +55,8 @@ object Client { timer: Timer ) extends NameInterpreter with Delegator with Admin.WithHandlers { + val log = Logger(this.getClass.getName) + private case class InstrumentedBind( act: InstrumentedActivity[NameTree[Name.Bound]], stream: StreamState[mesh.BindReq, mesh.BoundTreeRsp] @@ -91,6 +94,7 @@ object Client { val streamState = new StreamState[mesh.BindReq, mesh.BoundTreeRsp] val open = () => { val req = mkBindReq(root, path, dtab) + log.trace(s"Mesh bind request: Root: ${root.show}, Path: ${path.show}, Dtab: ${dtab.show}") streamState.recordApiCall(req) interpreter.streamBoundTree(req) } @@ -112,10 +116,17 @@ object Client { val streamState = new StreamState[mesh.DtabReq, mesh.DtabRsp] val open = () => { val req = mkDtabReq(root) + log.trace(s"Mesh dtab request: Path: ${root.show}") streamState.recordApiCall(req) delegator.streamDtab(req) } - streamActivity(open, decodeDtab, backoffs, timer, streamState).underlying + val dtabVar = streamActivity(open, decodeDtab, backoffs, timer, streamState).underlying + dtabVar.states.respond { + case Activity.Pending => log.trace(s"Dtab stream initialized for root: ${root.show}") + case Activity.Failed(cause) => log.trace(s"Dtab stream failed for root ${root.show}: ${cause.getMessage}") + case Activity.Ok(d) => log.trace(s"Dtab stream for root ${root.show}: ${d.show}") + } + dtabVar } } } @@ -125,6 +136,7 @@ object Client { tree: NameTree[Name.Path] ): Future[DelegateTree[Name.Bound]] = { val req = mkDelegateTreeReq(root, dtab, tree) + log.trace(s"Mesh delegate request: Root: ${root.show} Dtab: ${dtab.show}") delegator.getDelegateTree(req).flatMap { delegateTree => decodeDelegateTree(delegateTree) match { case Some(decoded) => Future.value(decoded) @@ -143,12 +155,20 @@ object Client { val streamState = new StreamState[mesh.ReplicasReq, mesh.Replicas] val open = () => { val req = mkReplicasReq(id) + log.trace(s"Mesh replicas request: client name: ${id.show}") streamState.recordApiCall(req) resolver.streamReplicas(req) } val resolution = streamVar(Addr.Pending, open, replicasToAddr, backoffs, timer, streamState) resolveCache += (id -> InstrumentedResolve(resolution, streamState)) - resolution.underlying + val underlyingVar = resolution.underlying + underlyingVar.changes.respond { + case Addr.Pending => log.trace(s"Pending addresses for client: ${id.show}") + case Addr.Failed(cause) => log.trace(s"Failed to receive address set for client ${id.show}: ${cause.getMessage}") + case Addr.Neg => log.trace(s"empty address set for client ${id.show}") + case Addr.Bound(addrs, _) => log.trace(s"Received addresses for client ${id.show}: ${addrs.mkString(",")}") + } + underlyingVar } } } From ca8d07d4ff08c7bfca3b0642ce3397563c151bc0 Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Thu, 23 May 2019 14:01:49 -0700 Subject: [PATCH 2/6] fix issue in H2 NettyWriter to actually log frame info Signed-off-by: Dennis Adjei-Baah --- .../com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala index 08e27a6fcf..824d47291c 100644 --- a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala +++ b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/netty4/Netty4H2Writer.scala @@ -90,7 +90,7 @@ private[netty4] object Netty4H2Writer { override protected[this] def write(frame: Http2Frame): Future[Unit] = { log.trace("[L:%s R:%s] write: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name) val f = trans.write(frame).rescue(wrapWriteException) - f.respond(v => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, v)) + f.respond(_ => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, frame)) f } From f4946bcea8d136177cedf4bac663cd399a192adb Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Thu, 23 May 2019 14:36:50 -0700 Subject: [PATCH 3/6] add client id to mesh interpreter in Linkerd Signed-off-by: Dennis Adjei-Baah --- .../MeshInterpreterInitializer.scala | 14 ++++++--- .../io/buoyant/interpreter/mesh/Client.scala | 29 ++++++++++--------- .../buoyant/interpreter/mesh/ClientTest.scala | 2 +- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala index 0575c8a14a..84fa93e725 100644 --- a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala +++ b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/MeshInterpreterInitializer.scala @@ -1,8 +1,12 @@ package io.buoyant.interpreter +import java.util.UUID.randomUUID + import com.fasterxml.jackson.annotation.JsonIgnore import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.Stack.Module import com.twitter.finagle._ +import com.twitter.finagle.buoyant.h2.H2HeaderInjector import com.twitter.finagle.buoyant.{H2, TlsClientConfig} import com.twitter.finagle.liveness.FailureDetector import com.twitter.finagle.liveness.FailureDetector.ThresholdConfig @@ -12,6 +16,7 @@ import com.twitter.finagle.util.DefaultTimer import com.twitter.logging.Logger import io.buoyant.interpreter.mesh.Client import io.buoyant.namer.{InterpreterConfig, InterpreterInitializer} + import scala.util.control.NoStackTrace /** @@ -67,30 +72,31 @@ case class MeshInterpreterConfig( failureThreshold: Option[FailureThresholdConfig] ) extends InterpreterConfig { import MeshInterpreterConfig._ - /** * Construct a namer. */ @JsonIgnore def newInterpreter(params: Stack.Params): NameInterpreter = { + val MeshClientHeader = "l5d-ctx-mesh-client" val name = dst match { case None => throw new IllegalArgumentException("`dst` is a required field") with NoStackTrace case Some(dst) => Name.Path(dst) } val label = MeshInterpreterInitializer.configId - + val clientId = randomUUID().toString val Retry(baseRetry, maxRetry) = retry.getOrElse(defaultRetry) val backoffs = Backoff.exponentialJittered(baseRetry.seconds, maxRetry.seconds) val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty) val failureThresholdParams = failureThreshold.map(_.params).getOrElse(Stack.Params.empty) + val headerIdentity = Map(MeshClientHeader -> clientId) - val client = H2.client + val client = H2.client.withStack(H2HeaderInjector.module(headerIdentity) +: _) .withParams(H2.client.params ++ tlsParams ++ failureThresholdParams ++ params) .newService(name, label) root.getOrElse(DefaultRoot) match { case r@Path.Utf8(_) => - Client(r, client, backoffs, DefaultTimer) + Client(clientId, r, client, backoffs, DefaultTimer) case r => val msg = s"io.l5d.mesh: `root` may only contain a single path element (for now): ${r.show}" diff --git a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala index b59e5e8bac..5090b87264 100644 --- a/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala +++ b/interpreter/mesh/src/main/scala/io/buoyant/interpreter/mesh/Client.scala @@ -20,6 +20,7 @@ import scala.util.control.{NoStackTrace, NonFatal} object Client { def apply( + clientId: String, root: Path, service: Service[h2.Request, h2.Response], backoffs: scala.Stream[Duration], @@ -28,10 +29,11 @@ object Client { val interpreter = new mesh.Interpreter.Client(service) val resolver = new mesh.Resolver.Client(service) val delegator = new mesh.Delegator.Client(service) - new Impl(root, interpreter, resolver, delegator, backoffs, timer) + new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer) } def apply( + clientId: String, root: Path, interpreter: mesh.Interpreter, resolver: mesh.Resolver, @@ -39,7 +41,7 @@ object Client { backoffs: scala.Stream[Duration], timer: Timer ): NameInterpreter with Delegator = - new Impl(root, interpreter, resolver, delegator, backoffs, timer) + new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer) private[this] val _releaseNop = () => Future.Unit private[this] val _rescueUnit: PartialFunction[Throwable, Future[Unit]] = { @@ -47,6 +49,7 @@ object Client { } private[this] class Impl( + clientId: String, root: Path, interpreter: mesh.Interpreter, resolver: mesh.Resolver, @@ -94,7 +97,7 @@ object Client { val streamState = new StreamState[mesh.BindReq, mesh.BoundTreeRsp] val open = () => { val req = mkBindReq(root, path, dtab) - log.trace(s"Mesh bind request: Root: ${root.show}, Path: ${path.show}, Dtab: ${dtab.show}") + log.trace(s"Mesh client [$clientId] mesh bind request: Root: ${root.show}, Path: ${path.show}, Dtab: ${dtab.show}") streamState.recordApiCall(req) interpreter.streamBoundTree(req) } @@ -116,15 +119,15 @@ object Client { val streamState = new StreamState[mesh.DtabReq, mesh.DtabRsp] val open = () => { val req = mkDtabReq(root) - log.trace(s"Mesh dtab request: Path: ${root.show}") + log.trace(s"Mesh client [$clientId] mesh dtab request: Path: ${root.show}") streamState.recordApiCall(req) delegator.streamDtab(req) } val dtabVar = streamActivity(open, decodeDtab, backoffs, timer, streamState).underlying dtabVar.states.respond { - case Activity.Pending => log.trace(s"Dtab stream initialized for root: ${root.show}") - case Activity.Failed(cause) => log.trace(s"Dtab stream failed for root ${root.show}: ${cause.getMessage}") - case Activity.Ok(d) => log.trace(s"Dtab stream for root ${root.show}: ${d.show}") + case Activity.Pending => log.trace(s"Mesh client [$clientId] Dtab stream initialized for root: ${root.show}") + case Activity.Failed(cause) => log.trace(s"Mesh client [$clientId] Dtab stream failed for root ${root.show}: ${cause.getMessage}") + case Activity.Ok(d) => log.trace(s"Mesh client [$clientId] Dtab stream for root ${root.show}: ${d.show}") } dtabVar } @@ -136,7 +139,7 @@ object Client { tree: NameTree[Name.Path] ): Future[DelegateTree[Name.Bound]] = { val req = mkDelegateTreeReq(root, dtab, tree) - log.trace(s"Mesh delegate request: Root: ${root.show} Dtab: ${dtab.show}") + log.trace(s"Mesh client [$clientId] mesh delegate request: Root: ${root.show} Dtab: ${dtab.show}") delegator.getDelegateTree(req).flatMap { delegateTree => decodeDelegateTree(delegateTree) match { case Some(decoded) => Future.value(decoded) @@ -155,7 +158,7 @@ object Client { val streamState = new StreamState[mesh.ReplicasReq, mesh.Replicas] val open = () => { val req = mkReplicasReq(id) - log.trace(s"Mesh replicas request: client name: ${id.show}") + log.trace(s"Mesh client [$clientId] mesh replicas request: client name: ${id.show}") streamState.recordApiCall(req) resolver.streamReplicas(req) } @@ -163,10 +166,10 @@ object Client { resolveCache += (id -> InstrumentedResolve(resolution, streamState)) val underlyingVar = resolution.underlying underlyingVar.changes.respond { - case Addr.Pending => log.trace(s"Pending addresses for client: ${id.show}") - case Addr.Failed(cause) => log.trace(s"Failed to receive address set for client ${id.show}: ${cause.getMessage}") - case Addr.Neg => log.trace(s"empty address set for client ${id.show}") - case Addr.Bound(addrs, _) => log.trace(s"Received addresses for client ${id.show}: ${addrs.mkString(",")}") + case Addr.Pending => log.trace(s"Mesh client [$clientId] Pending addresses for client: ${id.show}") + case Addr.Failed(cause) => log.trace(s"Mesh client [$clientId] Failed to receive address set for client ${id.show}: ${cause.getMessage}") + case Addr.Neg => log.trace(s"Mesh client [$clientId] empty address set for client ${id.show}") + case Addr.Bound(addrs, _) => log.trace(s"Mesh client [$clientId] Received addresses for client ${id.show}: ${addrs.mkString(",")}") } underlyingVar } diff --git a/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala b/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala index d90c5e45b7..2fc061b6e1 100644 --- a/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala +++ b/interpreter/mesh/src/test/scala/io/buoyant/interpreter/mesh/ClientTest.scala @@ -38,7 +38,7 @@ class ClientTest extends FunSuite { addrRsps } } - Client(Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer) + Client("", Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer) } val act = client.bind(Dtab.read("/stuff => /mas"), Path.read("/some/name")) From c60346b3b17df8e6732e142d1f576efc8bfbb024 Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Thu, 23 May 2019 14:39:00 -0700 Subject: [PATCH 4/6] log incoming namerd mesh requests Signed-off-by: Dennis Adjei-Baah --- .../grpc/runtime/ServerDispatcher.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala b/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala index bbec2b1c6f..f4619e73c0 100644 --- a/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala +++ b/grpc/runtime/src/main/scala/io/buoyant/grpc/runtime/ServerDispatcher.scala @@ -1,11 +1,13 @@ package io.buoyant.grpc.runtime -import com.twitter.finagle.{Failure, Service => FinagleService} +import com.twitter.finagle.{Service => FinagleService} import com.twitter.finagle.buoyant.h2 +import com.twitter.logging.Logger import com.twitter.util.{Future, Return, Throw, Try} object ServerDispatcher { + val log = Logger.get(this.getClass.getName) trait Service { def name: String def rpcs: Seq[Rpc] @@ -16,7 +18,6 @@ object ServerDispatcher { } object Rpc { - class UnaryToUnary[Req, Rsp]( val name: String, serve: Req => Future[Rsp], @@ -38,11 +39,13 @@ object ServerDispatcher { rspCodec: Codec[Rsp] ) extends ServerDispatcher.Rpc { - override def apply(req: h2.Request): Future[h2.Response] = - acceptUnary(reqCodec, req).map(streamResponse) + override def apply(req: h2.Request): Future[h2.Response] = { + val headersMap = req.headers.toSeq + acceptUnary(reqCodec, req).map(req => streamResponse(req, headersMap)) + } - private[this] val streamResponse: Req => h2.Response = - req => respondStreaming(rspCodec, serve(req)) + private[this] val streamResponse: (Req, Seq[(String, String)]) => h2.Response = + (req, reqMeta) => respondStreaming(rspCodec, serve(req), reqMeta) } class StreamToUnary[Req, Rsp]( @@ -68,7 +71,8 @@ object ServerDispatcher { override def apply(req: h2.Request): Future[h2.Response] = { val reqs = acceptStreaming(reqCodec, req) - val rsp = respondStreaming(rspCodec, serve(reqs)) + val headersMap = req.headers.toSeq + val rsp = respondStreaming(rspCodec, serve(reqs), headersMap) Future.value(rsp) } } @@ -99,11 +103,12 @@ object ServerDispatcher { h2.Response(h2.Status.Ok, frames) } - private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp]): h2.Response = { + private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp], reqMeta: Seq[(String, String)]): h2.Response = { val frames = h2.Stream() def loop(): Future[Unit] = msgs.recv().transform { case Return(Stream.Releasable(s, release)) => + log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") val buf = codec.encodeGrpcMessage(s) val data = h2.Frame.Data(buf, eos = false, release) frames.write(data).before(loop()) @@ -113,6 +118,7 @@ object ServerDispatcher { case s: GrpcStatus => s case e => GrpcStatus.Internal(e.getMessage) } + log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}") frames.write(status.toTrailers) } From bee6dbb10cab867a30d05ace56f89bd9e8eb8e2b Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Thu, 23 May 2019 14:42:12 -0700 Subject: [PATCH 5/6] add additional logging for k8s events Signed-off-by: Dennis Adjei-Baah --- k8s/src/main/scala/io/buoyant/k8s/Watchable.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala b/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala index 23b23dc858..6de419037e 100644 --- a/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala +++ b/k8s/src/main/scala/io/buoyant/k8s/Watchable.scala @@ -185,7 +185,10 @@ private[k8s] abstract class Watchable[O <: KubeObject: TypeReference, W <: Watch case Some((event, ws)) => import Ordering.Implicits._ - watchState.foreach(_.recordStreamData(event)) + watchState.foreach { state => + state.recordStreamData(event) + log.trace("k8s event on '%s' received '%s", watchPath, event) + } // Register the update only if its resource version is larger than or equal to the largest version // seen so far. if (largestEvent.forall(_ <= event)) { From 44cf2f1de0eefc48c9938ae2d973a81c8ea21e46 Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Thu, 23 May 2019 14:59:01 -0700 Subject: [PATCH 6/6] add H2HeaderInjector Signed-off-by: Dennis Adjei-Baah --- .../finagle/buoyant/h2/H2HeaderInjector.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala diff --git a/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala new file mode 100644 index 0000000000..a7113c7c17 --- /dev/null +++ b/finagle/h2/src/main/scala/com/twitter/finagle/buoyant/h2/H2HeaderInjector.scala @@ -0,0 +1,27 @@ +package com.twitter.finagle.buoyant.h2 + +import com.twitter.finagle +import com.twitter.finagle._ +import com.twitter.util.Future + +object H2HeaderInjector { + def module(headerMap: Map[String, String]): Stackable[ServiceFactory[Request, Response]] = { + new finagle.Stack.Module0[ServiceFactory[Request, Response]] { + override def make(next: ServiceFactory[Request, Response]): ServiceFactory[Request, Response] = + new HeaderInjector(headerMap).andThen(next) + + override def role: Stack.Role = Stack.Role("H2HeaderInjector") + override def description: String = "Add arbitrary headers to H2 requests" + } + } +} + +class HeaderInjector(injectHeaders: Map[String, String]) extends SimpleFilter[Request, Response] { + override def apply(request: Request, service: Service[Request, Response]): Future[Response] = { + val req = request.dup() + injectHeaders.foreach { kvPair => + req.headers.add(kvPair._1, kvPair._2); () + } + service(req) + } +}