Skip to content

Commit

Permalink
rewrite all sequencers with Ducts
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Aug 24, 2018
1 parent 4da4297 commit b6db073
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 183 deletions.
2 changes: 1 addition & 1 deletion modules/fishnet/src/main/FishnetApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ final class FishnetApi(
.logIfSlow(100, logger)(_ => s"acquire ${client.skill}")
.result
.recover {
case e: FutureSequencer.Timeout =>
case e: lila.hub.Duct.Timeout =>
lila.mon.fishnet.acquire.timeout(client.skill.key)()
logger.warn(s"[${client.skill}] Fishnet.acquire ${e.getMessage}")
none
Expand Down
35 changes: 35 additions & 0 deletions modules/hub/src/main/Duct.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.hub

import scala.collection.immutable.Queue
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.concurrent.stm._

import lila.base.LilaException
Expand Down Expand Up @@ -57,4 +58,38 @@ object Duct {
lila.log("Duct").warn(s"unhandled msg: $msg")
funit
}

case class Timeout(duration: FiniteDuration) extends lila.base.LilaException {
val message = s"FutureSequencer timed out after $duration"
}

/* Convenience functions to build upon Ducts */
object extra {

case class LazyFu[A](f: () => Fu[A]) {

def apply(timeout: Option[FiniteDuration])(implicit system: akka.actor.ActorSystem) =
timeout.foldLeft(f()) { (fu, dur) =>
fu.withTimeout(
duration = dur,
error = Timeout(dur)
)
}
}

def lazyFu = new Duct {
val process: Duct.ReceiveAsync = { case LazyFu(f) => f() }
}
def lazyFu(timeout: FiniteDuration)(implicit system: akka.actor.ActorSystem) = new Duct {
val process: Duct.ReceiveAsync = { case lf: LazyFu[_] => lf(timeout.some) }
}

case class LazyPromise[A](f: LazyFu[A], promise: Promise[A])

def lazyPromise(timeout: Option[FiniteDuration])(implicit system: akka.actor.ActorSystem) = new Duct {
val process: Duct.ReceiveAsync = {
case LazyPromise(lf, promise) => promise.completeWith { lf(timeout)(system) }.future
}
}
}
}
30 changes: 5 additions & 25 deletions modules/hub/src/main/FutureSequencer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,21 @@ package lila.hub
import scala.concurrent.duration._
import scala.concurrent.Promise

import Duct.extra._

final class FutureSequencer(
system: akka.actor.ActorSystem,
executionTimeout: Option[FiniteDuration] = None,
logger: lila.log.Logger
) {

import FutureSequencer._

def apply[A](op: => Fu[A]): Fu[A] = {
def apply[A](fu: => Fu[A]): Fu[A] = {
val promise = Promise[A]()
duct ! Op(() => op, promise)
duct ! LazyPromise(LazyFu(() => fu), promise)
promise.future
}

def queueSize = duct.queueSize

private[this] val duct = new Duct {
val process: Duct.ReceiveAsync = {
case Op(f, promise) => promise.completeWith {
executionTimeout.foldLeft(f()) { (fu, timeout) =>
fu.withTimeout(
duration = timeout,
error = Timeout(timeout)
)(system)
}
}.future
}
}
}

object FutureSequencer {

private case class Op[A](f: () => Fu[A], promise: Promise[A])

case class Timeout(duration: FiniteDuration) extends lila.base.LilaException {
val message = s"FutureSequencer timed out after $duration"
}
private[this] val duct = lazyPromise(executionTimeout)(system)
}
72 changes: 0 additions & 72 deletions modules/hub/src/main/Sequencer.scala

This file was deleted.

9 changes: 5 additions & 4 deletions modules/insight/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ final class Env(

private lazy val indexer = new Indexer(
storage = storage,
sequencer = system.actorOf(Props(
classOf[lila.hub.Sequencer],
None, None, logger
))
sequencer = new lila.hub.FutureSequencer(
system = system,
executionTimeout = None,
logger = logger
)
)

private lazy val userCacheApi = new UserCacheApi(coll = db(CollectionUserCache))
Expand Down
20 changes: 8 additions & 12 deletions modules/insight/src/main/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@ package lila.insight
import akka.actor.ActorRef
import org.joda.time.DateTime
import play.api.libs.iteratee._
import reactivemongo.bson._
import reactivemongo.api.ReadPreference
import reactivemongo.bson._

import lila.db.dsl._
import lila.db.dsl._
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.{ Game, GameRepo, Query }
import lila.hub.Sequencer
import lila.hub.FutureSequencer
import lila.user.User

private final class Indexer(storage: Storage, sequencer: ActorRef) {
private final class Indexer(storage: Storage, sequencer: FutureSequencer) {

def all(user: User): Funit = {
val p = scala.concurrent.Promise[Unit]()
sequencer ! Sequencer.work(compute(user), p.some)
p.future
def all(user: User): Funit = sequencer {
storage.fetchLast(user.id) flatMap {
case None => fromScratch(user)
case Some(e) => computeFrom(user, e.date plusSeconds 1, e.number + 1)
}
}

def update(game: Game, userId: String, previous: Entry): Funit =
Expand All @@ -27,11 +28,6 @@ private final class Indexer(storage: Storage, sequencer: ActorRef) {
case _ => funit
}

private def compute(user: User): Funit = storage.fetchLast(user.id) flatMap {
case None => fromScratch(user)
case Some(e) => computeFrom(user, e.date plusSeconds 1, e.number + 1)
}

private def fromScratch(user: User): Funit =
fetchFirstGame(user) flatMap {
_.?? { g => computeFrom(user, g.createdAt, 1) }
Expand Down
9 changes: 5 additions & 4 deletions modules/perfStat/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ final class Env(

lazy val indexer = new PerfStatIndexer(
storage = storage,
sequencer = system.actorOf(Props(
classOf[lila.hub.Sequencer],
None, None, lila.log("perfStat")
))
sequencer = new lila.hub.FutureSequencer(
system = system,
executionTimeout = None,
logger = lila.log("perfStat")
)
)

lazy val jsonView = new JsonView(lightUser)
Expand Down
16 changes: 5 additions & 11 deletions modules/perfStat/src/main/PerfStatIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ package lila.perfStat
import akka.actor.ActorRef

import lila.game.{ Game, GameRepo, Pov, Query }
import lila.hub.Sequencer
import lila.hub.FutureSequencer
import lila.rating.PerfType
import lila.user.User

final class PerfStatIndexer(storage: PerfStatStorage, sequencer: ActorRef) {
final class PerfStatIndexer(storage: PerfStatStorage, sequencer: FutureSequencer) {

def userPerf(user: User, perfType: PerfType): Funit = {
val p = scala.concurrent.Promise[Unit]()
sequencer ! Sequencer.work(compute(user, perfType), p.some)
p.future
}

private def compute(user: User, perfType: PerfType): Funit = {
def userPerf(user: User, perfType: PerfType): Funit = sequencer {
GameRepo.sortedCursor(
Query.user(user.id) ++
Query.finished ++
Expand All @@ -26,8 +20,8 @@ final class PerfStatIndexer(storage: PerfStatStorage, sequencer: ActorRef) {
case (perfStat, game) if game.perfType.contains(perfType) =>
Pov.ofUserId(game, user.id).fold(perfStat)(perfStat.agg)
case (perfStat, _) => perfStat
}
} flatMap storage.insert recover lila.db.recoverDuplicateKey(_ => ())
} flatMap storage.insert recover lila.db.recoverDuplicateKey(_ => ())
}

def addGame(game: Game): Funit = game.players.flatMap { player =>
player.userId.map { userId =>
Expand Down
12 changes: 7 additions & 5 deletions modules/pool/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package lila.pool

import scala.concurrent.duration._

import akka.actor._
import lila.hub.FutureSequencer

final class Env(
lobbyActor: ActorSelection,
lobbyActor: akka.actor.ActorSelection,
playbanApi: lila.playban.PlaybanApi,
system: akka.actor.ActorSystem,
onStart: String => Unit
Expand All @@ -24,9 +24,11 @@ final class Env(
private lazy val gameStarter = new GameStarter(
bus = system.lilaBus,
onStart = onStart,
sequencer = system.actorOf(Props(
classOf[lila.hub.Sequencer], none, 10.seconds.some, logger
), name = "pool-sequencer")
sequencer = new FutureSequencer(
system = system,
executionTimeout = 5.seconds.some,
logger = logger
)
)
}

Expand Down
15 changes: 5 additions & 10 deletions modules/pool/src/main/GameStarter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@ import akka.actor._
import scala.concurrent.Promise

import lila.game.{ Game, Player, GameRepo }
import lila.hub.Sequencer
import lila.hub.FutureSequencer
import lila.rating.Perf
import lila.user.{ User, UserRepo }

private final class GameStarter(
bus: lila.common.Bus,
onStart: Game.ID => Unit,
sequencer: ActorRef
sequencer: FutureSequencer
) {

def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit = {
val promise = Promise[Unit]()
sequencer ! Sequencer.work(all(pool, couples), promise.some)
promise.future
}

private def all(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit =
couples.nonEmpty ?? {
def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit = couples.nonEmpty ?? {
sequencer {
val userIds = couples.flatMap(_.userIds)
UserRepo.perfOf(userIds, pool.perfType) flatMap { perfs =>
couples.map(one(pool, perfs)).sequenceFu.void
}
}
}

private def one(pool: PoolConfig, perfs: Map[User.ID, Perf])(couple: MatchMaking.Couple): Funit = {
import couple._
Expand Down
11 changes: 6 additions & 5 deletions modules/simul/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import com.typesafe.config.Config
import scala.concurrent.duration._

import lila.hub.actorApi.map.Ask
import lila.hub.{ ActorMap, Sequencer }
import lila.socket.Socket.{ GetVersion, SocketVersion }
import lila.hub.{ Duct, DuctMap }
import lila.socket.History
import lila.socket.Socket.{ GetVersion, SocketVersion }
import makeTimeout.short

final class Env(
Expand Down Expand Up @@ -111,9 +111,10 @@ final class Env(

private[simul] val simulColl = db(CollectionSimul)

private val sequencerMap = system.actorOf(Props(ActorMap { id =>
new Sequencer(SequencerTimeout.some, logger = logger)
}))
private val sequencerMap = new DuctMap(
mkDuct = _ => Duct.extra.lazyFu,
accessTimeout = SequencerTimeout
)

private lazy val simulCleaner = new SimulCleaner(repo, api, socketHub)

Expand Down
Loading

0 comments on commit b6db073

Please sign in to comment.