Skip to content

Commit

Permalink
[split] WIP: Materialize Weights in NameServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Stu Hood committed Apr 11, 2012
1 parent d18a6bc commit 9108cb8
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait GizzardServer {
private def asReplicatingNode[T](ts: Seq[T]): shards.RoutingNode[T] = {
new shards.ReplicatingShard(
new shards.ShardInfo("", "", ""),
0,
shards.Weight.Default,
ts map { shards.LeafRoutingNode(_) }
)
}
Expand Down
32 changes: 25 additions & 7 deletions src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
import scala.collection.mutable
import com.twitter.logging.Logger
import com.twitter.gizzard.shards._
import com.twitter.gizzard.thrift.HostWeightInfo


class NonExistentShard(message: String) extends ShardException(message: String)
Expand All @@ -13,9 +14,10 @@ class NameserverUninitialized extends ShardException("Please call reload() befor


class RoutingState(
instantiateNode: (ShardInfo, Int, Seq[RoutingNode[Any]]) => RoutingNode[Any],
instantiateNode: (ShardInfo, Weight, Seq[RoutingNode[Any]]) => RoutingNode[Any],
infos: Iterable[ShardInfo],
links: Iterable[LinkInfo],
hostWeights: Iterable[HostWeightInfo],
forwardings: Iterable[Forwarding]
) {

Expand All @@ -26,9 +28,13 @@ class RoutingState(
m
}

private def constructRoutingNode(root: ShardId, weight: Weight): RoutingNode[Any] = {
val hostWeightMap = hostWeights.map{ hw => hw.hostname -> hw }.toMap

private def constructRoutingNode(root: ShardId, rawWeight: Int): RoutingNode[Any] = {
val weight = Weight(rawWeight, hostWeightMap.get(root.hostname))
infoMap.get(root) map { rootInfo =>
val children = linkMap.getOrElse(root, Nil) map { case (id, wt) => constructRoutingNode(id, wt) }
val children =
linkMap.getOrElse(root, Nil).map { case (id, wt) => constructRoutingNode(id, wt) }
instantiateNode(rootInfo, weight, children)
} getOrElse {
throw new InvalidShard("Expected shard '"+ root +"' to exist, but it wasn't found.")
Expand Down Expand Up @@ -90,19 +96,22 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction

val infos = mutable.ArrayBuffer[ShardInfo]()
val links = mutable.ArrayBuffer[LinkInfo]()
val hostWeights = mutable.ArrayBuffer[HostWeightInfo]()
val forwardings = mutable.ArrayBuffer[Forwarding]()
val (states, updatedSeq) = shardManager.currentState()

states foreach { state =>
infos ++= state.shards
links ++= state.links
hostWeights ++= state.hostWeights
forwardings ++= state.forwardings
}

val routes = new RoutingState(
shardRepository.instantiateNode,
infos,
links,
hostWeights,
forwardings
)

Expand All @@ -112,9 +121,17 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction
log.info("Loading name server configuration is done.")
}

private def constructRoutingNode(shardId: ShardId, weight: Weight) : RoutingNode[Any] = {
/**
* Constructs a RoutingNode via recursive trips to the ShardManager database.
* TODO(hyungoo): is this equivalent to findShardById[Any]?
*/
private def fetchRoutingNode(shardId: ShardId, rawWeight: Int): RoutingNode[Any] = {
val shardInfo = shardManager.getShard(shardId)
val children = shardManager.listDownwardLinks(shardId).map { link => constructRoutingNode(link.downId, link.weight) }
val weight = Weight(rawWeight, shardManager.getHostWeight(shardId.hostname))
val children =
shardManager.listDownwardLinks(shardId).map { link =>
fetchRoutingNode(link.downId, link.weight)
}
shardRepository.instantiateNode(shardInfo, weight, children)
}

Expand All @@ -136,7 +153,7 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction

deletedForwardingsByTableId.get(tableId).getOrElse(Nil) foreach { f => newTreeMap.remove(f.baseId) }
updatedForwardingsByTableId.get(tableId).getOrElse(Nil) foreach { f =>
newTreeMap.put(f.baseId, constructRoutingNode(f.shardId))
newTreeMap.put(f.baseId, fetchRoutingNode(f.shardId, 1))
}

forwardingTree.put(tableId, newTreeMap)
Expand All @@ -150,8 +167,9 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction
// XXX: removing this causes CopyJobSpec to fail.
// This method now always falls back to the db.
@throws(classOf[NonExistentShard])
def findShardById[T](id: ShardId, weight: Int): RoutingNode[T] = {
def findShardById[T](id: ShardId, rawWeight: Int): RoutingNode[T] = {
val shardInfo = shardManager.getShard(id)
val weight = Weight(rawWeight, shardManager.getHostWeight(id.hostname))
val downwardLinks = shardManager.listDownwardLinks(id)
val children = downwardLinks.map(l => findShardById[T](l.downId, l.weight)).toList

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,44 @@ import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._
import com.twitter.gizzard.shards.{ShardId, ShardInfo, LinkInfo}
import com.twitter.gizzard.thrift.{NameServerState => ThriftNameServerState}
import com.twitter.gizzard.thrift.HostWeightInfo
import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.LinkInfo._
import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.util.TreeUtils


case class NameServerState(shards: List[ShardInfo], links: List[LinkInfo], forwardings: List[Forwarding], tableId: Int) {
case class NameServerState(
shards: Seq[ShardInfo],
links: Seq[LinkInfo],
hostWeights: Seq[HostWeightInfo],
forwardings: Seq[Forwarding],
tableId: Int
) {
def toThrift = {
val thriftForwardings = forwardings.map(_.toThrift)
val thriftLinks = links.map(_.toThrift)
val thriftShards = shards.map(_.toThrift)
new ThriftNameServerState(thriftShards, thriftLinks, thriftForwardings, tableId)
new ThriftNameServerState(thriftShards, thriftLinks, thriftForwardings, tableId, hostWeights)
}
}

object NameServerState {
import TreeUtils._

def extractTable(tableId: Int)
(forwardingsByTable: Int => Set[Forwarding])
(linksByUpId: ShardId => Set[LinkInfo])
(shardsById: ShardId => ShardInfo) = {

def extractTable(
tableId: Int,
forwardingsByTable: Int => Set[Forwarding],
hostWeights: Seq[HostWeightInfo],
linksByUpId: ShardId => Set[LinkInfo],
shardsById: ShardId => ShardInfo
) = {
val weights = hostWeights
val forwardings = forwardingsByTable(tableId)
val links = descendantLinks(forwardings.map(_.shardId))(linksByUpId)
val shards = (forwardings.map(_.shardId) ++ links.map(_.downId)).map(shardsById)

NameServerState(shards.toList, links.toList, forwardings.toList, tableId)
NameServerState(shards.toSeq, links.toSeq, weights, forwardings.toSeq, tableId)
}
}
16 changes: 13 additions & 3 deletions src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def shardsForHostname(hostname: String) = shard.read.any(_.shardsForHostname(hostname))
def listShards() = shard.read.any(_.listShards())
def getBusyShards() = shard.read.any(_.getBusyShards())
def getHostWeight(hostname: String): Option[thrift.HostWeightInfo] = shard.read.any(_.getHostWeight(hostname))

def addLink(upId: ShardId, downId: ShardId, weight: Int) { shard.write.foreach(_.addLink(upId, downId, weight)) }
def removeLink(upId: ShardId, downId: ShardId) { shard.write.foreach(_.removeLink(upId, downId)) }
Expand Down Expand Up @@ -59,12 +60,19 @@ trait ShardManagerSource {
import TreeUtils._

lazy val shardsById = listShards().map(s => s.id -> s).toMap
lazy val hostWeights = listHostWeights()
lazy val linksByUpId = mapOfSets(listLinks())(_.upId)
lazy val forwardingsByTable = mapOfSets(getForwardingsForTableIds(tableIds))(_.tableId)

def extractor(id: Int) = NameServerState.extractTable(id)(forwardingsByTable)(linksByUpId)(shardsById)

tableIds.map(extractor)
tableIds.map { id =>
NameServerState.extractTable(
id,
forwardingsByTable,
hostWeights,
linksByUpId,
shardsById
)
}
}

@throws(classOf[ShardException]) def batchExecute(commands : Seq[TransformOperation])
Expand All @@ -77,6 +85,8 @@ trait ShardManagerSource {
@throws(classOf[ShardException]) def shardsForHostname(hostname: String): Seq[ShardInfo]
@throws(classOf[ShardException]) def listShards(): Seq[ShardInfo]
@throws(classOf[ShardException]) def getBusyShards(): Seq[ShardInfo]
@throws(classOf[ShardException]) def getHostWeight(hostname: String): Option[thrift.HostWeightInfo]
@throws(classOf[ShardException]) def listHostWeights(): Seq[thrift.HostWeightInfo]


@throws(classOf[ShardException]) def addLink(upId: ShardId, downId: ShardId, weight: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ object LeafRoutingNode {
}

def apply[T](shard: T): LeafRoutingNode[T] = {
apply(shard, shard, new ShardInfo("", "", ""), Weight(1, 1, 1))
apply(shard, shard, new ShardInfo("", "", ""), Weight.Default)
}

// XXX: remove when we move to shard replica sets rather than trees.
object NullNode extends LeafRoutingNode[Null](
new LeafRoutingNode.WrapperShardFactory(null, null),
new ShardInfo("NullShardPlaceholder", "null", "null"),
1
Weight.Default
)
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/com/twitter/gizzard/shards/Weight.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package com.twitter.gizzard.shards

import com.twitter.gizzard.thrift.HostWeightInfo

/**
* @param raw The base value from which write and read weights were computed.
* @param write Write preference.
* @param read Read preference.
*/
case class Weight(raw: Int, write: Int, read: Int)

object Weight {
val Default = Weight(1, 1, 1)

/** Materialize a Weight from a HostWeight. */
def apply(raw: Int, hostWeight: Option[HostWeightInfo]): Weight = {
val weightRead = raw * hostWeight.map(_.weight_read).getOrElse(1)
val weightWrite = raw * hostWeight.map(_.weight_write).getOrElse(1)
Weight(raw, weightWrite, weightRead)
}
}
7 changes: 7 additions & 0 deletions src/main/thrift/Manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ struct LinkInfo {
3: i32 weight
}

struct HostWeightInfo {
1: string hostname
2: i32 weight_write
3: i32 weight_read
}

struct Forwarding {
1: i32 table_id
2: i64 base_id
Expand All @@ -37,6 +43,7 @@ struct NameServerState {
2: list<LinkInfo> links
3: list<Forwarding> forwardings
4: i32 table_id
5: list<HostWeightInfo> hostWeights
}


Expand Down

0 comments on commit 9108cb8

Please sign in to comment.