Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional logging to track mesh clients interacting with Namerd #2277

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
log incoming namerd mesh requests
Signed-off-by: Dennis Adjei-Baah <[email protected]>
  • Loading branch information
Dennis Adjei-Baah committed May 23, 2019
commit c60346b3b17df8e6732e142d1f576efc8bfbb024
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.buoyant.grpc.runtime

import com.twitter.finagle.{Failure, Service => FinagleService}
import com.twitter.finagle.{Service => FinagleService}
import com.twitter.finagle.buoyant.h2
import com.twitter.logging.Logger
import com.twitter.util.{Future, Return, Throw, Try}

object ServerDispatcher {

val log = Logger.get(this.getClass.getName)
trait Service {
def name: String
def rpcs: Seq[Rpc]
Expand All @@ -16,7 +18,6 @@ object ServerDispatcher {
}

object Rpc {

class UnaryToUnary[Req, Rsp](
val name: String,
serve: Req => Future[Rsp],
Expand All @@ -38,11 +39,13 @@ object ServerDispatcher {
rspCodec: Codec[Rsp]
) extends ServerDispatcher.Rpc {

override def apply(req: h2.Request): Future[h2.Response] =
acceptUnary(reqCodec, req).map(streamResponse)
override def apply(req: h2.Request): Future[h2.Response] = {
val headersMap = req.headers.toSeq
acceptUnary(reqCodec, req).map(req => streamResponse(req, headersMap))
}

private[this] val streamResponse: Req => h2.Response =
req => respondStreaming(rspCodec, serve(req))
private[this] val streamResponse: (Req, Seq[(String, String)]) => h2.Response =
(req, reqMeta) => respondStreaming(rspCodec, serve(req), reqMeta)
}

class StreamToUnary[Req, Rsp](
Expand All @@ -68,7 +71,8 @@ object ServerDispatcher {

override def apply(req: h2.Request): Future[h2.Response] = {
val reqs = acceptStreaming(reqCodec, req)
val rsp = respondStreaming(rspCodec, serve(reqs))
val headersMap = req.headers.toSeq
val rsp = respondStreaming(rspCodec, serve(reqs), headersMap)
Future.value(rsp)
}
}
Expand Down Expand Up @@ -99,11 +103,12 @@ object ServerDispatcher {
h2.Response(h2.Status.Ok, frames)
}

private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp]): h2.Response = {
private[this] def respondStreaming[Rsp](codec: Codec[Rsp], msgs: Stream[Rsp], reqMeta: Seq[(String, String)]): h2.Response = {
val frames = h2.Stream()
def loop(): Future[Unit] =
msgs.recv().transform {
case Return(Stream.Releasable(s, release)) =>
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This says response metadata but it's printing the request headers, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intent here? Each time there's a response message, the request headers get printed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct since the request headers may contain information useful for the particular request.

val buf = codec.encodeGrpcMessage(s)
val data = h2.Frame.Data(buf, eos = false, release)
frames.write(data).before(loop())
Expand All @@ -113,6 +118,7 @@ object ServerDispatcher {
case s: GrpcStatus => s
case e => GrpcStatus.Internal(e.getMessage)
}
log.trace(s"Streaming response metadata: ${reqMeta.mkString(":")}")
frames.write(status.toTrailers)
}

Expand Down