Skip to content

Commit

Permalink
Introduces option to enable dtab compression for Consul
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev committed Jul 27, 2019
1 parent d87f193 commit 32e5d89
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 17 deletions.
5 changes: 4 additions & 1 deletion consul/src/main/scala/io/buoyant/consul/v1/BaseApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.twitter.finagle._
import com.twitter.io.{Buf, Reader}
import com.twitter.util._
import io.buoyant.consul.log
import scala.collection.immutable.Map
import scala.util.control.NonFatal

// a thunked version of the api call such that we can peek at the request before making the call
Expand Down Expand Up @@ -117,4 +118,6 @@ object Headers {
val Index = "X-Consul-Index"
}

case class Indexed[T](value: T, index: Option[String])
case class Indexed[T](value: T, index: Option[String]) {
def mapValue[R](f: T => R): Indexed[R] = copy(value = f(value))
}
18 changes: 12 additions & 6 deletions consul/src/main/scala/io/buoyant/consul/v1/KvApi.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package io.buoyant.consul.v1

import java.util.Base64

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.http
import com.twitter.finagle.service.Backoff
import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
import com.twitter.util._

object KvApi {
def apply(c: Client, backoff: Stream[Duration]): KvApi = new KvApi(c, s"/$versionString", backoff)
def apply(c: Client, backoff: Stream[Duration], enableValueCompression: Boolean = false): KvApi = new KvApi(c, s"/$versionString", backoff, enableValueCompression)
}

class KvApi(
val client: Client,
val uriPrefix: String,
val backoffs: Stream[Duration],
val enableValueCompression: Boolean,
val stats: StatsReceiver = DefaultStatsReceiver
) extends BaseApi with Closable {
val kvPrefix = s"$uriPrefix/kv"
Expand Down Expand Up @@ -64,6 +62,7 @@ class KvApi(
"dc" -> datacenter
),
call = req => executeRaw(req, retry)
.map(_.mapValue(v => if (enableValueCompression) GZIPStringEncoder.decodeString(v) else v))
)

/**
Expand All @@ -89,7 +88,9 @@ class KvApi(
"dc" -> datacenter,
"recurse" -> recurse.map(_.toString)
),
call = req => executeJson[Seq[Key]](req, retry)
call = req =>
executeJson[Seq[Key]](req, retry)
.map(indexed => if (enableValueCompression) indexed.mapValue(_.map(_.decompress)) else indexed)
)

/**
Expand All @@ -115,7 +116,11 @@ class KvApi(
"dc" -> datacenter
),
call = req => {
req.setContentString(value)
req.setContentString(
if (enableValueCompression)
GZIPStringEncoder.encode(value.getBytes)
else value
)
executeJson[Boolean](req, retry).map(_.value)
}
)
Expand Down Expand Up @@ -156,4 +161,5 @@ case class Key(
Value: Option[String]
) {
lazy val decoded: Option[String] = Value.map { raw => new String(Base64.getDecoder.decode(raw)) }
def decompress: Key = copy(Value = Value.map(GZIPStringEncoder.decodeString))
}
35 changes: 35 additions & 0 deletions consul/src/test/scala/io/buoyant/consul/v1/KvApiTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import org.scalatest.FunSuite
class KvApiTest extends FunSuite with Awaits with Exceptions {
val listBuf = Buf.Utf8("""["foo/bar/", "foo/baz/"]""")
val getBuf = Buf.Utf8("""foobar""")
val getBufGzip = Buf.Utf8("""H4sIAAAAAAAAAEvLz09KLAIAlR/2ngYAAAA=""")
val multiGetBuf = Buf.Utf8("""[{"LockIndex":0,"Key":"sample","Flags":0,"Value":"Zm9vYmFy","CreateIndex":10,"ModifyIndex":12}]""")
val multiGetBufGzip = Buf.Utf8("""[{"LockIndex":0,"Key":"sample","Flags":0,"Value":"H4sIAAAAAAAA/4vKtSyLzHWrBAAIZ7JnCAAAAA==","CreateIndex":10,"ModifyIndex":12}]""")
val putOkBuf = Buf.Utf8("""true""")
val putFailBuf = Buf.Utf8("""false""")
val deleteOkBuf = Buf.Utf8("""true""")
Expand Down Expand Up @@ -78,6 +80,14 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
assert(result.value == "foobar")
}

test("get returns an indexed value (Gzip)") {
val service = stubService(getBufGzip)

val result = await(KvApi(service, constBackoff, enableValueCompression = true).get("/some/path/to/key")())
assert(result.index == Some("4"))
assert(result.value == "foobar")
}

test("get uses raw values") {
val service = stubService(putFailBuf)

Expand Down Expand Up @@ -126,6 +136,16 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
assert(result.value.head.decoded == Some("foobar"))
}

test("multiGet returns an indexed seq of values (Gzip)") {
val service = stubService(multiGetBufGzip)

val result = await(KvApi(service, constBackoff, enableValueCompression = true).multiGet("/sample")())
assert(result.index == Some("4"))
assert(result.value.size == 1)
assert(result.value.head.decoded == Some("foobar"))
}


test("multiGet by default is non-recurse") {
val service = stubService(multiGetBuf)

Expand Down Expand Up @@ -186,6 +206,21 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
assert(!result)
}


test("put compresses value when enableValueCompression: true") {
val service = Service.mk[Request, Response] { req =>
assert(req.contentString == "H4sIAAAAAAAAAEvLz09KLAIAlR/2ngYAAAA=")
val rsp = Response()
rsp.content(Buf.Utf8("""true"""))
rsp.setContentTypeJson()
rsp.headerMap.set("X-Consul-Index", "4")
Future.value(rsp)
}

val result = await(KvApi(service, constBackoff, enableValueCompression = true).put("/path/to/key", "foobar")())
assert(result)
}

test("put cas flag") {
val service = stubService(putFailBuf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class ConsulDtabInterpreterConfig(
writeConsistencyMode: Option[ConsistencyMode] = None,
failFast: Option[Boolean] = None,
backoff: Option[BackoffConfig] = None,
tls: Option[TlsClientConfig] = None
tls: Option[TlsClientConfig] = None,
enableValueCompression: Option[Boolean] = None
) extends InterpreterConfig {

import ConsulDtabInterpreterConfig._
Expand All @@ -54,6 +55,7 @@ case class ConsulDtabInterpreterConfig(
val servicePort = port.getOrElse(DefaultPort).port
val backoffs = backoff.map(_.mk).getOrElse(DefaultBackoff)
val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty)
val enableValCompression = enableValueCompression.getOrElse(false)
val service = Http.client
.interceptInterrupts
.failFast(failFast)
Expand All @@ -63,7 +65,7 @@ case class ConsulDtabInterpreterConfig(
.withParams(tlsParams)
.newService(s"/$$/inet/$serviceHost/$servicePort")

KvApi(service, backoffs)
KvApi(service, backoffs, enableValCompression)
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConsulInterpreterTest extends FunSuite with Inside {

val config = parse(yaml)
inside(config) {
case ConsulDtabInterpreterConfig(host, port, _, namespace, _, _, _, _, _, _, _) =>
case ConsulDtabInterpreterConfig(host, port, _, namespace, _, _, _, _, _, _, _, _) =>
assert(host.get == "consul-node")
assert(port.get == Port(9999))
assert(namespace.get == "internal")
Expand Down
1 change: 1 addition & 0 deletions linkerd/docs/interpreter.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ writeConsistencyMode | `default` | Select between [Consul API consistency modes]
failFast | `false` | If `false`, disable fail fast and failure accrual for Consul client. Keep it `false` when using a local agent but change it to `true` when talking directly to an HA Consul API.
backoff | exponential backoff from 1ms to 1min | Object that determines which backoff algorithm should be used. See [retry backoff](https://linkerd.io/config/head/linkerd#retry-backoff-parameters)
tls | no tls | Use TLS during connection with Consul. see [Consul Encryption](https://www.consul.io/docs/agent/encryption.html) and [Namer TLS](#namer-tls).
enableValueCompression | `false` | Enables the use of Gzip compression for values stored in Consul. Allows for larger dtabs.
1 change: 1 addition & 0 deletions namerd/docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ writeConsistencyMode | `default` | Select between [Consul API consistency modes]
failFast | `false` | If `false`, disable fail fast and failure accrual for Consul client. Keep it `false` when using a local agent but change it to `true` when talking directly to an HA Consul API.
backoff | exponential backoff from 1ms to 1min | Object that determines which backoff algorithm should be used. See [retry backoff](https://linkerd.io/config/head/linkerd#retry-backoff-parameters)
tls | no tls | Use TLS during connection with Consul. see [Consul Encryption](https://www.consul.io/docs/agent/encryption.html) and [Namer TLS](#namer-tls).
enableValueCompression | `false` | Enables the use of Gzip compression for values stored in Consul. Allows for larger dtabs.

### Namer TLS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ case class ConsulConfig(
writeConsistencyMode: Option[ConsistencyMode] = None,
failFast: Option[Boolean] = None,
backoff: Option[BackoffConfig] = None,
tls: Option[TlsClientConfig] = None
tls: Option[TlsClientConfig] = None,
enableValueCompression: Option[Boolean] = None
) extends DtabStoreConfig {
import ConsulConfig._

Expand All @@ -36,6 +37,7 @@ case class ConsulConfig(
val servicePort = port.getOrElse(DefaultPort).port
val backoffs = backoff.map(_.mk).getOrElse(DefaultBackoff)
val tlsParams = tls.map(_.params).getOrElse(Stack.Params.empty)
val enableValCompression = enableValueCompression.getOrElse(false)

val service = Http.client
.interceptInterrupts
Expand All @@ -46,7 +48,7 @@ case class ConsulConfig(
.withParams(tlsParams)
.newService(s"/$$/inet/$serviceHost/$servicePort")
new ConsulDtabStore(
KvApi(service, backoffs),
KvApi(service, backoffs, enableValCompression),
root,
datacenter = datacenter,
readConsistency = readConsistencyMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ConsulConfigTest extends FunSuite with OptionValues {
|datacenter: us-east-42
|readConsistencyMode: stale
|writeConsistencyMode: consistent
|enableValueCompression: true
|tls:
| disableValidation: false
| commonName: consul.io
Expand All @@ -41,6 +42,7 @@ class ConsulConfigTest extends FunSuite with OptionValues {
assert(consul.datacenter == Some("us-east-42"))
assert(consul.readConsistencyMode == Some(ConsistencyMode.Stale))
assert(consul.writeConsistencyMode == Some(ConsistencyMode.Consistent))
assert(consul.enableValueCompression == Some(true))
val clientAuth = ClientAuth("/certificates/cert.pem", None, "/certificates/key.pem")
val tlsConfig = TlsClientConfig(None, Some(false), Some("consul.io"), None, Some("/certificates/cacert-bundle.pem"), Some(clientAuth))
assert(consul.tls == Some(tlsConfig))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import io.buoyant.router.http.ForwardClientCertFilter.Enabled
import io.buoyant.router.http.MaxCallDepthFilter.MaxCallDepthExceeded
import scala.util.control.NoStackTrace

class MaxCallDepthFilter[Req, H: HeadersLike, Rep](maxCalls: Int, headerKey: String)
(implicit requestLike: RequestLike[Req, H]) extends SimpleFilter[Req, Rep] {
class MaxCallDepthFilter[Req, H: HeadersLike, Rep](maxCalls: Int, headerKey: String)(implicit requestLike: RequestLike[Req, H]) extends SimpleFilter[Req, Rep] {

def numCalls(viaValue: String) = viaValue.split(",").length

Expand All @@ -27,7 +26,7 @@ object MaxCallDepthFilter {

final case class MaxCallDepthExceeded(calls: Int)
extends Exception(s"Maximum number of calls ($calls) has been exceeded. Please check for proxy loops.")
with NoStackTrace
with NoStackTrace

final case class Param(value: Int) extends AnyVal {
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
Expand All @@ -37,8 +36,7 @@ object MaxCallDepthFilter {
implicit val param = Stack.Param(Param(1000))
}

def module[Req, H: HeadersLike, Rep](headerKey: String)
(implicit requestLike: RequestLike[Req, H]): Stackable[ServiceFactory[Req, Rep]] =
def module[Req, H: HeadersLike, Rep](headerKey: String)(implicit requestLike: RequestLike[Req, H]): Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module1[Param, ServiceFactory[Req, Rep]] {
val role = Stack.Role("MaxCallDepthFilter")
val description = "Limits the number of hops by looking at the Via header of a request"
Expand Down

0 comments on commit 32e5d89

Please sign in to comment.