Skip to content

Commit

Permalink
Merge branch 'release_2_2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephan Zuercher committed May 11, 2012
2 parents 7e00fd6 + f967a41 commit ff3bfdd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 10 deletions.
28 changes: 18 additions & 10 deletions src/main/scala/net/lag/kestrel/KestrelHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,26 @@ abstract class KestrelHandler(
// will do a continuous fetch on a queue until time runs out or read buffer is full.
final def monitorUntil(key: String, timeLimit: Option[Time], maxItems: Int, opening: Boolean)(f: (Option[QItem], Option[Long]) => Unit) {
log.debug("monitor -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
if (maxItems == 0 || (timeLimit.isDefined && timeLimit.get <= Time.now) || countPendingReads(key) >= maxOpenReads) {
f(None, None)
} else {
queues.remove(key, timeLimit, opening, false).onSuccess {
case None =>
f(None, None)
case x @ Some(item) =>
val xidContext = if (opening) addPendingRead(key, item.xid) else None
f(x, xidContext)
monitorUntil(key, timeLimit, maxItems - 1, opening)(f)
Stats.incr("cmd_monitor")

def monitorLoop(maxItems: Int) {
log.debug("monitor loop -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
if (maxItems == 0 || (timeLimit.isDefined && timeLimit.get <= Time.now) || countPendingReads(key) >= maxOpenReads) {
f(None, None)
} else {
Stats.incr("cmd_monitor_get")
queues.remove(key, timeLimit, opening, false).onSuccess {
case None =>
f(None, None)
case x @ Some(item) =>
val xidContext = if (opening) addPendingRead(key, item.xid) else None
f(x, xidContext)
monitorLoop(maxItems - 1)
}
}
}

monitorLoop(maxItems)
}

def getItem(key: String, timeout: Option[Time], opening: Boolean, peeking: Boolean): Future[Option[QItem]] = {
Expand Down
60 changes: 60 additions & 0 deletions src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging

Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_set")() mustEqual 0
Stats.getCounter("cmd_monitor")() mustEqual 0
Stats.getCounter("cmd_monitor_get")() mustEqual 0
Stats.getCounter("get_hits")() mustEqual 0
Stats.getCounter("get_misses")() mustEqual 0

Expand All @@ -117,6 +119,64 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
Stats.getCounter("cmd_get")() mustEqual 2
Stats.getCounter("get_hits")() mustEqual 1
Stats.getCounter("get_misses")() mustEqual 1
Stats.getCounter("cmd_monitor")() mustEqual 0
Stats.getCounter("cmd_monitor_get")() mustEqual 0
}
}

"track monitor stats" in {
withTempFolder {
Stats.clearAll()
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)

handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
handler.setItem("test", 0, None, "three".getBytes)
Stats.getCounter("cmd_set")() mustEqual 3
Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_monitor")() mustEqual 0
Stats.getCounter("cmd_monitor_get")() mustEqual 0

val items = new mutable.ListBuffer[Option[QItem]]()
def addItem(item: Option[QItem], xid: Option[Long]) { items.append(item) }

handler.monitorUntil("test", Some(1.hour.fromNow), 2, false)(addItem)
items.size mustEqual 3
items(0) must beString("one")
items(1) must beString("two")
items(2) mustEqual None
Stats.getCounter("cmd_set")() mustEqual 3
Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_monitor")() mustEqual 1
Stats.getCounter("cmd_monitor_get")() mustEqual 2
Stats.getCounter("get_hits")() mustEqual 2
Stats.getCounter("get_misses")() mustEqual 0

items.clear()
handler.monitorUntil("test", Some(1.second.fromNow), 2, false)(addItem)
timer.timeout()
items.size mustEqual 2
items(0) must beString("three")
items(1) mustEqual None
Stats.getCounter("cmd_set")() mustEqual 3
Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_monitor")() mustEqual 2
Stats.getCounter("cmd_monitor_get")() mustEqual 4
Stats.getCounter("get_hits")() mustEqual 3
Stats.getCounter("get_misses")() mustEqual 1

items.clear()
handler.monitorUntil("test", Some(1.second.fromNow), 2, false)(addItem)
timer.timeout()
items.size mustEqual 1
items(0) mustEqual None
Stats.getCounter("cmd_set")() mustEqual 3
Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_monitor")() mustEqual 3
Stats.getCounter("cmd_monitor_get")() mustEqual 5
Stats.getCounter("get_hits")() mustEqual 3
Stats.getCounter("get_misses")() mustEqual 2
}
}

Expand Down

0 comments on commit ff3bfdd

Please sign in to comment.