Skip to content

Commit

Permalink
fix shutdown bugs and add DeadlineWaitQueueSpec
Browse files Browse the repository at this point in the history
Kestrel now evicts connections waiting on PersistentQueue operations
(akak waiters) to allow finagle to drain connections.

Provides explicit start/shutdown for journal packer thread to
prevent stray shutdown log messages.

RB_ID=89058
  • Loading branch information
Stephan Zuercher committed Oct 1, 2012
1 parent 59e3d33 commit defea76
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 90 deletions.
19 changes: 15 additions & 4 deletions src/main/scala/net/lag/kestrel/DeadlineWaitQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import com.twitter.util.{Time, Timer, TimerTask}
* exactly one of the functions will be called, never both.
*/
final class DeadlineWaitQueue(timer: Timer) {
case class Waiter(var timerTask: TimerTask, awaken: () => Unit)
case class Waiter(var timerTask: TimerTask, awaken: () => Unit, timeout: () => Unit)
private val queue = JavaConversions.asScalaSet(new LinkedHashSet[Waiter])

def add(deadline: Time, awaken: () => Unit, onTimeout: () => Unit) = {
val waiter = Waiter(null, awaken)
def add(deadline: Time, awaken: () => Unit, timeout: () => Unit) = {
val waiter = Waiter(null, awaken, timeout)
val timerTask = timer.schedule(deadline) {
if (synchronized { queue.remove(waiter) }) onTimeout()
if (synchronized { queue.remove(waiter) }) waiter.timeout()
}
waiter.timerTask = timerTask
synchronized { queue.add(waiter) }
Expand Down Expand Up @@ -69,6 +69,17 @@ final class DeadlineWaitQueue(timer: Timer) {
}
}

def evictAll() {
synchronized {
val rv = queue.toArray
queue.clear()
rv
}.foreach { waiter =>
waiter.timerTask.cancel()
waiter.timeout()
}
}

def size() = {
synchronized { queue.size }
}
Expand Down
78 changes: 63 additions & 15 deletions src/main/scala/net/lag/kestrel/Journal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.twitter.util.{Future, Duration, Time}
case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)

case class Checkpoint(filename: String, reservedItems: Seq[QItem])

case class PackRequest(journal: Journal, checkpoint: Checkpoint, openItems: Iterable[QItem],
pentUpDeletes: Int, queueState: Iterable[QItem])

Expand Down Expand Up @@ -184,7 +185,7 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
val negs = removesSinceReadBehind - newlyClosedItems.size // newly closed are already accounted for.

outstandingPackRequests.incrementAndGet()
packerQueue.add(PackRequest(this, checkpoint, newlyOpenItems, negs, queueState))
packer.add(PackRequest(this, checkpoint, newlyOpenItems, negs, queueState))
}

def close() {
Expand Down Expand Up @@ -479,7 +480,7 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
}
}

private def pack(state: PackRequest) {
private[kestrel] def pack(state: PackRequest) {
val oldFilenames =
Journal.journalsBefore(queuePath, queueName, state.checkpoint.filename) ++
List(state.checkpoint.filename)
Expand All @@ -501,6 +502,65 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
}
}

class JournalPackerTask {
val logger = Logger.get(getClass)
val queue = new LinkedBlockingQueue[Option[PackRequest]]()

var thread: Thread = null

def start() {
synchronized {
if (thread == null) {
thread = new Thread("journal-packer") {
override def run() {
var running = true
while (running) {
val requestOpt = queue.take()
requestOpt match {
case Some(request) => pack(request)
case None => running = false
}
}
logger.info("journal-packer exited.")
}
}
thread.setDaemon(true)
thread.setName("journal-packer")
thread.start()
} else {
logger.error("journal-packer already started.")
}
}
}

def shutdown() {
synchronized {
if (thread != null) {
logger.info("journal-packer exiting.")
queue.add(None)
thread.join(5000L)
thread = null
} else {
logger.error("journal-packer not running.")
}
}
}

def add(request: PackRequest) {
queue.add(Some(request))
}

private def pack(request: PackRequest) {
try {
request.journal.pack(request)
request.journal.outstandingPackRequests.decrementAndGet()
} catch {
case e: Throwable =>
logger.error(e, "Uncaught exception in journal-packer: %s", e)
}
}
}

object Journal {
def getQueueNamesFromFolder(path: File): Set[String] = {
path.listFiles().filter { file =>
Expand Down Expand Up @@ -579,17 +639,5 @@ object Journal {
journalsForQueue(path, queueName).dropWhile { _ != filename }.drop(1).headOption
}

val packerQueue = new LinkedBlockingQueue[PackRequest]()
val packer = BackgroundProcess.spawnDaemon("journal-packer") {
while (true) {
val request = packerQueue.take()
try {
request.journal.pack(request)
request.journal.outstandingPackRequests.decrementAndGet()
} catch {
case e: Throwable =>
Logger.get(getClass).error(e, "Uncaught exception in packer: %s", e)
}
}
}
val packer = new JournalPackerTask
}
71 changes: 50 additions & 21 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
private val log = Logger.get(getClass.getName)

var queueCollection: QueueCollection = null
var timer: NettyTimer = null
var timer: Timer = null
var journalSyncScheduler: ScheduledExecutorService = null
var memcacheService: Option[Server] = None
var textService: Option[Server] = None
Expand Down Expand Up @@ -81,13 +81,14 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
connectionBacklog.foreach { backlog => builder = builder.backlog(backlog) }
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle.
builder.build(factory)
val server = builder.build(factory)
log.info("%s server started on %s", name, address)
server
}

def startThriftServer(
name: String,
port: Int,
fTimer: Timer
port: Int
): Server = {
val address = new InetSocketAddress(listenAddress, port)
var builder = ServerBuilder()
Expand All @@ -98,10 +99,12 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
connectionBacklog.foreach { backlog => builder = builder.backlog(backlog) }
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle.
builder.build(connection => {
val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions, fTimer, Some(serverStatus))
val server = builder.build(connection => {
val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions, timer, Some(serverStatus))
new ThriftFinagledService(handler, new TBinaryProtocol.Factory())
})
log.info("%s server started on %s", name, address)
server
}

private def bytesRead(n: Int) {
Expand All @@ -123,7 +126,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
Stats.setLabel("version", Kestrel.runtime.jarVersion)

// this means no timeout will be at better granularity than 100 ms.
timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)
val nettyTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)
timer = new TimerFromNettyTimer(nettyTimer)

journalSyncScheduler =
new ScheduledThreadPoolExecutor(
Expand All @@ -134,10 +138,10 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
log.warning("Rejected journal fsync")
}
})
Journal.packer.start()

val finagleTimer = new TimerFromNettyTimer(timer)
try {
queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler,
queueCollection = new QueueCollection(queuePath, timer, journalSyncScheduler,
defaultQueueConfig, builders, aliases)
queueCollection.loadQueues()
} catch {
Expand All @@ -152,10 +156,10 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali

serverStatus =
zkConfig.map { cfg =>
new ZooKeeperServerStatus(cfg, statusFile, finagleTimer, defaultStatus,
new ZooKeeperServerStatus(cfg, statusFile, timer, defaultStatus,
statusChangeGracePeriod)
} getOrElse {
new ServerStatus(statusFile, finagleTimer, defaultStatus, statusChangeGracePeriod)
new ServerStatus(statusFile, timer, defaultStatus, statusChangeGracePeriod)
}
serverStatus.start()

Expand All @@ -179,7 +183,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
}

thriftService = thriftListenPort.map { port =>
startThriftServer("kestrel-thrift", port, finagleTimer)
startThriftServer("kestrel-thrift", port)
}

// optionally, start a periodic timer to clean out expired items.
Expand Down Expand Up @@ -218,23 +222,48 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
def shutdown() {
log.info("Shutting down!")

// finagle 1.11.1 has a bug where close() may never complete.
memcacheService.foreach { _.close(1.second) }
textService.foreach { _.close(1.second) }
thriftService.foreach { _.close(1.second) }

expirationBackgroundProcess.foreach { _.shutdown() }
// finagle cannot drain connections if they have long wait operations
// pending, so start a thread to periodically evict any waiters. Once
// close is invoked on the endpoint, finagle will close their connections
// at the completion of the evicted wait request.
val kicker = new PeriodicBackgroundProcess("evict-waiters", 1.second) {
def periodic() {
queueCollection.evictWaiters()
}
}
kicker.start()

if (queueCollection ne null) {
queueCollection.shutdown()
queueCollection = null
try {
memcacheService.foreach { svc =>
svc.close(30.seconds)
log.info("kestrel-memcache server stopped")
}
textService.foreach { svc =>
svc.close(30.seconds)
log.info("kestrel-text server stopped")
}
thriftService.foreach { svc =>
svc.close(30.seconds)
log.info("kestrel-thrift server stopped")
}
} finally {
kicker.shutdown()
}

if (timer ne null) {
timer.stop()
timer = null
}

expirationBackgroundProcess.foreach { _.shutdown() }

Journal.packer.shutdown()

if (queueCollection ne null) {
queueCollection.shutdown()
queueCollection = null
}

if (journalSyncScheduler ne null) {
journalSyncScheduler.shutdown()
journalSyncScheduler.awaitTermination(5, TimeUnit.SECONDS)
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
}

def evictWaiters() {
synchronized {
waiters.evictAll()
}
}

/**
* Return a transactionally-removed item to the queue. This is a rolled-
* back transaction.
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/net/lag/kestrel/QueueCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,21 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
}

/**
* Force any requests in a waiting opertion (e.g., remove) to react as
* if canceled. This method is useful for forcing connections to drain
* as part of shutting down.
*/
def evictWaiters(): Unit = synchronized {
if (shuttingDown) {
return
}

for ((name, q) <- queues) {
q.evictWaiters()
}
}

/**
* Shutdown this queue collection. Any future queue requests will fail.
*/
Expand Down
Loading

0 comments on commit defea76

Please sign in to comment.