Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional logging to track mesh clients interacting with Namerd #2277

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.twitter.finagle.buoyant.h2

import com.twitter.finagle
import com.twitter.finagle._
import com.twitter.util.Future

object H2HeaderInjector {
def module(headerMap: Map[String, String]): Stackable[ServiceFactory[Request, Response]] = {
new finagle.Stack.Module0[ServiceFactory[Request, Response]] {
override def make(next: ServiceFactory[Request, Response]): ServiceFactory[Request, Response] =
new HeaderInjector(headerMap).andThen(next)

override def role: Stack.Role = Stack.Role("H2HeaderInjector")
override def description: String = "Add arbitrary headers to H2 requests"
}
}
}

class HeaderInjector(injectHeaders: Map[String, String]) extends SimpleFilter[Request, Response] {
override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
val req = request.dup()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? aren't headers mutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember from my last run-in with H2 request mangling that I had to dup() this for some reason, alas, I do not remember. I think we don't need this.

injectHeaders.foreach { kvPair =>
req.headers.add(kvPair._1, kvPair._2); ()
}
service(req)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[netty4] object Netty4H2Writer {
override protected[this] def write(frame: Http2Frame): Future[Unit] = {
log.trace("[L:%s R:%s] write: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name)
val f = trans.write(frame).rescue(wrapWriteException)
f.respond(v => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, v))
f.respond(_ => log.trace("[L:%s R:%s] wrote: %s: %s", trans.context.localAddress, trans.context.remoteAddress, frame.name, frame))
f
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.buoyant.grpc.runtime

import com.twitter.finagle.{Failure, Service => FinagleService}
import com.twitter.finagle.{Service => FinagleService}
import com.twitter.finagle.buoyant.h2
import com.twitter.logging.Logger
import com.twitter.util.{Future, Return, Throw, Try}

object ServerDispatcher {

val log = Logger.get(this.getClass.getName)
trait Service {
def name: String
def rpcs: Seq[Rpc]
Expand All @@ -16,7 +18,6 @@ object ServerDispatcher {
}

object Rpc {

class UnaryToUnary[Req, Rsp](
val name: String,
serve: Req => Future[Rsp],
Expand All @@ -38,11 +39,13 @@ object ServerDispatcher {
rspCodec: Codec[Rsp]
) extends ServerDispatcher.Rpc {

override def apply(req: h2.Request): Future[h2.Response] =
acceptUnary(reqCodec, req).map(streamResponse)
override def apply(req: h2.Request): Future[h2.Response] = {
val headersMap = req.headers.toSeq
acceptUnary(reqCodec, req).map(req => streamResponse(req, headersMap))
}

private[this] val streamResponse: Req => h2.Response =
req => respondStreaming(rspCodec, serve(req))
private[this] val streamResponse: (Req, Seq[(String, String)]) => h2.Response =
(req, reqMeta) => respondStreaming(rspCodec, serve(req), reqMeta)
}

class StreamToUnary[Req, Rsp](
Expand All @@ -68,7 +71,8 @@ object ServerDispatcher {

override def apply(req: h2.Request): Future[h2.Response] = {
val reqs = acceptStreaming(reqCodec, req)
val rsp = respondStreaming(rspCodec, serve(reqs))
val headersMap = req.headers.toSeq
val rsp = respondStreaming(rspCodec, serve(reqs), headersMap)
Future.value(rsp)
}
}
Expand Down Expand Up @@ -99,11 +103,12 @@ object ServerDispatcher {
h2.Response(h2.Status.Ok, frames)
}

private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp]): h2.Response = {
private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp], reqMeta: Seq[(String, String)]): h2.Response = {
val frames = h2.Stream()
def loop(): Future[Unit] =
msgs.recv().transform {
case Return(Stream.Releasable(s, release)) =>
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This says response metadata but it's printing the request headers, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intent here? Each time there's a response message, the request headers get printed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct since the request headers may contain information useful for the particular request.

val buf = codec.encodeGrpcMessage(s)
val data = h2.Frame.Data(buf, eos = false, release)
frames.write(data).before(loop())
Expand All @@ -113,6 +118,7 @@ object ServerDispatcher {
case s: GrpcStatus => s
case e => GrpcStatus.Internal(e.getMessage)
}
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}")
frames.write(status.toTrailers)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.buoyant.interpreter
import com.fasterxml.jackson.annotation.JsonIgnore
import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.buoyant.h2.H2HeaderInjector
import com.twitter.finagle.buoyant.{H2, TlsClientConfig}
import com.twitter.finagle.liveness.FailureDetector
import com.twitter.finagle.liveness.FailureDetector.ThresholdConfig
Expand All @@ -12,6 +13,7 @@ import com.twitter.finagle.util.DefaultTimer
import com.twitter.logging.Logger
import io.buoyant.interpreter.mesh.Client
import io.buoyant.namer.{InterpreterConfig, InterpreterInitializer}
import java.util.UUID.randomUUID
import scala.util.control.NoStackTrace

/**
Expand Down Expand Up @@ -67,30 +69,31 @@ case class MeshInterpreterConfig(
failureThreshold: Option[FailureThresholdConfig]
) extends InterpreterConfig {
import MeshInterpreterConfig._

/**
* Construct a namer.
*/
@JsonIgnore
def newInterpreter(params: Stack.Params): NameInterpreter = {
val MeshClientHeader = "l5d-ctx-mesh-client"
val name = dst match {
case None => throw new IllegalArgumentException("`dst` is a required field") with NoStackTrace
case Some(dst) => Name.Path(dst)
}
val label = MeshInterpreterInitializer.configId

val clientId = randomUUID().toString
val Retry(baseRetry, maxRetry) = retry.getOrElse(defaultRetry)
val backoffs = Backoff.exponentialJittered(baseRetry.seconds, maxRetry.seconds)
val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty)
val failureThresholdParams = failureThreshold.map(_.params).getOrElse(Stack.Params.empty)
val headerIdentity = Map(MeshClientHeader -> clientId)

val client = H2.client
val client = H2.client.withStack(H2HeaderInjector.module(headerIdentity) +: _)
.withParams(H2.client.params ++ tlsParams ++ failureThresholdParams ++ params)
.newService(name, label)

root.getOrElse(DefaultRoot) match {
case [email protected](_) =>
Client(r, client, backoffs, DefaultTimer)
Client(clientId, r, client, backoffs, DefaultTimer)

case r =>
val msg = s"io.l5d.mesh: `root` may only contain a single path element (for now): ${r.show}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.buoyant.interpreter.mesh

import com.twitter.finagle._
import com.twitter.finagle.{Addr, _}
import com.twitter.finagle.buoyant.h2
import com.twitter.finagle.buoyant.h2.Reset
import com.twitter.finagle.http.{MediaType, Request, Response}
Expand All @@ -14,11 +14,13 @@ import io.buoyant.namer.{DelegateTree, Delegator, InstrumentedActivity, Instrume
import io.linkerd.mesh
import io.linkerd.mesh.Converters._
import java.net.{InetAddress, InetSocketAddress}
import com.twitter.logging.Logger
import scala.util.control.{NoStackTrace, NonFatal}

object Client {

def apply(
clientId: String,
root: Path,
service: Service[h2.Request, h2.Response],
backoffs: scala.Stream[Duration],
Expand All @@ -27,25 +29,27 @@ object Client {
val interpreter = new mesh.Interpreter.Client(service)
val resolver = new mesh.Resolver.Client(service)
val delegator = new mesh.Delegator.Client(service)
new Impl(root, interpreter, resolver, delegator, backoffs, timer)
new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer)
}

def apply(
clientId: String,
root: Path,
interpreter: mesh.Interpreter,
resolver: mesh.Resolver,
delegator: mesh.Delegator,
backoffs: scala.Stream[Duration],
timer: Timer
): NameInterpreter with Delegator =
new Impl(root, interpreter, resolver, delegator, backoffs, timer)
new Impl(clientId, root, interpreter, resolver, delegator, backoffs, timer)

private[this] val _releaseNop = () => Future.Unit
private[this] val _rescueUnit: PartialFunction[Throwable, Future[Unit]] = {
case NonFatal(_) => Future.Unit
}

private[this] class Impl(
clientId: String,
root: Path,
interpreter: mesh.Interpreter,
resolver: mesh.Resolver,
Expand All @@ -54,6 +58,8 @@ object Client {
timer: Timer
) extends NameInterpreter with Delegator with Admin.WithHandlers {

val log = Logger.get(this.getClass.getName)

private case class InstrumentedBind(
act: InstrumentedActivity[NameTree[Name.Bound]],
stream: StreamState[mesh.BindReq, mesh.BoundTreeRsp]
Expand Down Expand Up @@ -89,12 +95,14 @@ object Client {
case Some(bind) => bind.act.underlying
case None =>
val streamState = new StreamState[mesh.BindReq, mesh.BoundTreeRsp]
val logWithClientId: String => Unit = log.trace(s"Mesh client %s BoundTreeRsp stream state: %s", clientId, _)
val open = () => {
val req = mkBindReq(root, path, dtab)
log.trace(s"Mesh client [$clientId] mesh bind request: Root: ${root.show}, Path: ${path.show}, Dtab: ${dtab.show}")
streamState.recordApiCall(req)
interpreter.streamBoundTree(req)
}
val bind = streamActivity(open, decodeBoundTree, backoffs, timer, streamState)
val bind = streamActivity(open, decodeBoundTree, backoffs, timer, streamState, logWithClientId)
bindCache += (key -> InstrumentedBind(bind, streamState))
bind.underlying
}
Expand All @@ -110,12 +118,14 @@ object Client {
dtabCache.act.underlying
} else {
val streamState = new StreamState[mesh.DtabReq, mesh.DtabRsp]
val logWithClientId: String => Unit = log.trace(s"Mesh client %s DtabRsp stream state: %s", clientId, _)
val open = () => {
val req = mkDtabReq(root)
log.trace(s"Mesh client [$clientId] mesh dtab request: Path: ${root.show}")
streamState.recordApiCall(req)
delegator.streamDtab(req)
}
streamActivity(open, decodeDtab, backoffs, timer, streamState).underlying
streamActivity(open, decodeDtab, backoffs, timer, streamState, logWithClientId).underlying
}
}
}
Expand All @@ -125,6 +135,7 @@ object Client {
tree: NameTree[Name.Path]
): Future[DelegateTree[Name.Bound]] = {
val req = mkDelegateTreeReq(root, dtab, tree)
log.trace(s"Mesh client [$clientId] delegate request: Root: ${root.show} Dtab: ${dtab.show}")
delegator.getDelegateTree(req).flatMap { delegateTree =>
decodeDelegateTree(delegateTree) match {
case Some(decoded) => Future.value(decoded)
Expand All @@ -141,12 +152,14 @@ object Client {
case Some(resolution) => resolution.act.underlying
case None =>
val streamState = new StreamState[mesh.ReplicasReq, mesh.Replicas]
val logWithClientId: String => Unit = log.trace(s"Mesh client %s Replicas stream state: %s", clientId, _)
val open = () => {
val req = mkReplicasReq(id)
log.trace(s"Mesh client [$clientId] replicas request: client name: ${id.show}")
streamState.recordApiCall(req)
resolver.streamReplicas(req)
}
val resolution = streamVar(Addr.Pending, open, replicasToAddr, backoffs, timer, streamState)
val resolution = streamVar(Addr.Pending, open, replicasToAddr, backoffs, timer, streamState, logWithClientId)
resolveCache += (id -> InstrumentedResolve(resolution, streamState))
resolution.underlying
}
Expand Down Expand Up @@ -240,7 +253,8 @@ object Client {
toT: Try[S] => Option[T],
backoffs0: scala.Stream[Duration],
timer: Timer,
streamState: StreamState[_, S]
streamState: StreamState[_, S],
logWithClientId: String => Unit
): InstrumentedVar[T] = InstrumentedVar[T](init) { state =>
implicit val timer0 = timer

Expand All @@ -261,10 +275,12 @@ object Client {
else rsps.recv().respond { rep =>
streamState.recordResponse(rep.map(_.value))
}.transform {
case Throw(_) if closed =>
case Throw(e) if closed =>
logWithClientId(e.getMessage)
releasePrior().rescue(_rescueUnit)

case Throw(NonFatal(e)) =>
logWithClientId(e.getMessage)
val releasePriorNoError = () => releasePrior().rescue(_rescueUnit)
backoffs match {
case wait #:: moreBackoffs =>
Expand All @@ -275,13 +291,16 @@ object Client {
}

case Throw(e) => // fatal
logWithClientId(e.getMessage)
Future.exception(e)

case Return(Stream.Releasable(s, release)) =>
toT(Return(s)) match {
case None =>
logWithClientId("None")
release().before(loop(rsps, backoffs0, releasePrior))
case Some(t) =>
logWithClientId(t.toString)
state() = t
releasePrior().before(loop(rsps, backoffs0, release))
}
Expand Down Expand Up @@ -311,7 +330,8 @@ object Client {
toT: S => Option[T],
bos: scala.Stream[Duration],
timer: Timer,
state: StreamState[_, S]
state: StreamState[_, S],
logWithClientId: String => Unit
): InstrumentedActivity[T] = {
val toState: Try[S] => Option[Activity.State[T]] = {
case Throw(e) => Some(Activity.Failed(e))
Expand All @@ -321,7 +341,7 @@ object Client {
case Some(t) => Some(Activity.Ok(t))
}
}
new InstrumentedActivity(streamVar(Activity.Pending, open, toState, bos, timer, state))
new InstrumentedActivity(streamVar(Activity.Pending, open, toState, bos, timer, state, logWithClientId))
}

private[this] val decodeDtab: mesh.DtabRsp => Option[Dtab] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ClientTest extends FunSuite {
addrRsps
}
}
Client(Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer)
Client("", Path.Utf8("foons"), interp, resolv, unusedDelegator, scala.Stream.empty, DefaultTimer)
}

val act = client.bind(Dtab.read("/stuff => /mas"), Path.read("/some/name"))
Expand Down
5 changes: 4 additions & 1 deletion k8s/src/main/scala/io/buoyant/k8s/Watchable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ private[k8s] abstract class Watchable[O <: KubeObject: TypeReference, W <: Watch

case Some((event, ws)) =>
import Ordering.Implicits._
watchState.foreach(_.recordStreamData(event))
watchState.foreach { state =>
state.recordStreamData(event)
log.trace("k8s event on '%s' received '%s", watchPath, event)
}
// Register the update only if its resource version is larger than or equal to the largest version
// seen so far.
if (largestEvent.forall(_ <= event)) {
Expand Down