Skip to content

Commit

Permalink
ensure schedules and subscriptions clear up on actor restart
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Mar 26, 2016
1 parent 49c87e0 commit ce04b8e
Show file tree
Hide file tree
Showing 30 changed files with 117 additions and 99 deletions.
6 changes: 3 additions & 3 deletions modules/api/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ final class Env(

lazy val cli = new Cli(system.lilaBus, renderer)

system.actorOf(Props(new KamonPusher(
countUsers = () => userEnv.onlineUserIdMemo.count
)))
KamonPusher.start(system) {
new KamonPusher(countUsers = () => userEnv.onlineUserIdMemo.count)
}
}

object Env {
Expand Down
4 changes: 3 additions & 1 deletion modules/api/src/main/KamonPusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ private final class KamonPusher(
import KamonPusher._

override def preStart() {
context.system.lilaBus.subscribe(self, 'nbMembers, 'nbRounds)
scheduleTick
}

Expand Down Expand Up @@ -43,6 +42,9 @@ private final class KamonPusher(
object KamonPusher {

private case object Tick

def start(system: ActorSystem)(instance: => Actor) =
system.lilaBus.subscribe(system.actorOf(Props(instance)), 'nbMembers, 'nbRounds)
}

import com.typesafe.config.Config
Expand Down
5 changes: 2 additions & 3 deletions modules/explorer/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ final class Env(
}
}

if (IndexFlow) system.actorOf(Props(new Actor {
context.system.lilaBus.subscribe(self, 'finishGame)
if (IndexFlow) system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case lila.game.actorApi.FinishGame(game, _, _) => indexer(game)
}
}))
})), 'finishGame)
}

object Env {
Expand Down
5 changes: 2 additions & 3 deletions modules/gameSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ final class Env(
forms = forms,
paginator = paginator)

system.actorOf(Props(new Actor {
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
import lila.game.actorApi.{ InsertGame, FinishGame }
context.system.lilaBus.subscribe(self, 'finishGame)
def receive = {
case FinishGame(game, _, _) => self ! InsertGame(game)
case InsertGame(game) => api store game
}
}), name = ActorName)
}), name = ActorName), 'finishGame)

def cli = new lila.common.Cli {
import akka.pattern.ask
Expand Down
5 changes: 2 additions & 3 deletions modules/insight/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ final class Env(
pipeline = aggregationPipeline,
indexer = indexer)

system.actorOf(Props(new Actor {
system.lilaBus.subscribe(self, 'analysisReady)
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case lila.analyse.actorApi.AnalysisReady(game, _) => api updateGame game
}
}))
})), 'analysisReady)
}

object Env {
Expand Down
22 changes: 11 additions & 11 deletions modules/lobby/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ final class Env(
maxPerPage = SeekMaxPerPage,
maxPerUser = SeekMaxPerUser)

val lobby = system.actorOf(Props(new Lobby(
socket = socket,
seekApi = seekApi,
blocking = blocking,
playban = playban,
onStart = onStart,
val lobby = Lobby.start(system, ActorName,
broomPeriod = BroomPeriod,
resyncIdsPeriod = ResyncIdsPeriod
)), name = ActorName)
resyncIdsPeriod = ResyncIdsPeriod) {
new Lobby(
socket = socket,
seekApi = seekApi,
blocking = blocking,
playban = playban,
onStart = onStart)
}

lazy val socketHandler = new SocketHandler(
hub = hub,
Expand All @@ -64,13 +65,12 @@ final class Env(

private val abortListener = new AbortListener(seekApi = seekApi)

system.actorOf(Props(new Actor {
system.lilaBus.subscribe(self, 'abortGame)
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case lila.game.actorApi.AbortedBy(pov) if pov.game.isCorrespondence =>
abortListener recreateSeek pov
}
}))
})), 'abortGame)
}

object Env {
Expand Down
26 changes: 16 additions & 10 deletions modules/lobby/src/main/Lobby.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,7 @@ private[lobby] final class Lobby(
seekApi: SeekApi,
blocking: String => Fu[Set[String]],
playban: String => Fu[Option[lila.playban.TempBan]],
onStart: String => Unit,
broomPeriod: FiniteDuration,
resyncIdsPeriod: FiniteDuration) extends Actor {

val scheduler = context.system.scheduler

override def preStart {
scheduler.schedule(5 seconds, broomPeriod, self, lila.socket.actorApi.Broom)
scheduler.schedule(10 seconds, resyncIdsPeriod, self, actorApi.Resync)
}
onStart: String => Unit) extends Actor {

def receive = {

Expand Down Expand Up @@ -168,3 +159,18 @@ private[lobby] final class Lobby(
socket ! RemoveHook(hook.id)
}
}

private object Lobby {

def start(
system: ActorSystem,
name: String,
broomPeriod: FiniteDuration,
resyncIdsPeriod: FiniteDuration)(instance: => Actor) = {

val ref = system.actorOf(Props(instance), name = name)
system.scheduler.schedule(5 seconds, broomPeriod, ref, lila.socket.actorApi.Broom)
system.scheduler.schedule(10 seconds, resyncIdsPeriod, ref, actorApi.Resync)
ref
}
}
12 changes: 10 additions & 2 deletions modules/lobby/src/main/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,19 @@ private[lobby] final class Socket(

override val startsOnApplicationBoot = true

override def preStart {
super.preStart
override def preStart() {
super.preStart()
context.system.lilaBus.subscribe(self, 'changeFeaturedGame, 'streams, 'nbMembers, 'nbRounds)
}

override def postStop() {
super.postStop()
context.system.lilaBus.unsubscribe(self)
}

// override postRestart so we don't call preStart and schedule a new message
override def postRestart(reason: Throwable) = {}

def receiveSpecific = {

case PingVersion(uid, v) => Future {
Expand Down
7 changes: 2 additions & 5 deletions modules/mod/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ final class Env(
emailAddress = emailAddress)

// api actor
private val actorApi = system.actorOf(Props(new Actor {
override def preStart {
context.system.lilaBus.subscribe(self, 'finishGame, 'analysisReady)
}
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case lila.hub.actorApi.mod.MarkCheater(userId) => api autoAdjust userId
case lila.analyse.actorApi.AnalysisReady(game, analysis) =>
Expand All @@ -79,7 +76,7 @@ final class Env(
assessApi.onGameReady(game, whiteUser, blackUser)
}
}
}), name = ActorName)
}), name = ActorName), 'finishGame, 'analysisReady)
}

object Env {
Expand Down
5 changes: 2 additions & 3 deletions modules/perfStat/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ final class Env(
indexer.userPerf(user, perfType) >> storage.find(user.id, perfType)
} map (_ | PerfStat.init(user.id, perfType))

system.actorOf(Props(new Actor {
context.system.lilaBus.subscribe(self, 'finishGame)
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case lila.game.actorApi.FinishGame(game, _, _) => indexer addGame game
}
}))
})), 'finishGame)
}

object Env {
Expand Down
1 change: 1 addition & 0 deletions modules/push/src/main/ApplePush.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private final class ApnsActor(certificate: InputStream, password: String) extend
}

override def postStop() {
super.postStop()
Option(manager).foreach(_.shutdown())
}

Expand Down
7 changes: 2 additions & 5 deletions modules/push/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,15 @@ final class Env(
getLightUser,
roundSocketHub)

system.actorOf(Props(new Actor {
override def preStart() {
system.lilaBus.subscribe(self, 'finishGame, 'moveEvent, 'challenge)
}
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
import akka.pattern.pipe
def receive = {
case lila.game.actorApi.FinishGame(game, _, _) => pushApi finish game
case move: lila.hub.actorApi.round.MoveEvent => pushApi move move
case lila.challenge.Event.Create(c) => pushApi challengeCreate c
case lila.challenge.Event.Accept(c, joinerId) => pushApi.challengeAccept(c, joinerId)
}
}))
})), 'finishGame, 'moveEvent, 'challenge)
}

object Env {
Expand Down
5 changes: 3 additions & 2 deletions modules/round/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ final class Env(

lazy val noteApi = new NoteApi(db(CollectionNote))

new MoveMonitor(system, moveTimeChannel)
MoveMonitor.start(system, moveTimeChannel)

scheduler.message(2.1 seconds)(roundMap -> actorApi.GetNbRounds)

Expand All @@ -192,7 +192,8 @@ final class Env(
uciMemo = uciMemo,
prefApi = prefApi)

lazy val tvBroadcast = system.actorOf(Props(classOf[TvBroadcast]))
val tvBroadcast = system.actorOf(Props(classOf[TvBroadcast]))
system.lilaBus.subscribe(tvBroadcast, 'moveEvent, 'changeFeaturedGame)

def checkOutoftime(game: lila.game.Game) {
if (game.playable && game.started && !game.isUnlimited)
Expand Down
26 changes: 13 additions & 13 deletions modules/round/src/main/MoveMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import akka.actor._
import kamon._
import metric.SubscriptionsDispatcher.TickMetricSnapshot

private final class MoveMonitor(
system: ActorSystem,
channel: ActorRef) {
private object MoveMonitor {

Kamon.metrics.subscribe("trace", "round.move.trace", system.actorOf(Props(new Actor {
def receive = {
case tick: TickMetricSnapshot => tick.metrics.collectFirst {
case (entity, snapshot) if entity.category == "trace" => snapshot
} flatMap (_ histogram "elapsed-time") foreach { h =>
if (!h.isEmpty) channel ! lila.socket.Channel.Publish(
lila.socket.Socket.makeMessage("mlat", (h.sum / h.numberOfMeasurements / 1000000).toInt)
)
def start(system: ActorSystem, channel: ActorRef) =

Kamon.metrics.subscribe("trace", "round.move.trace", system.actorOf(Props(new Actor {
def receive = {
case tick: TickMetricSnapshot => tick.metrics.collectFirst {
case (entity, snapshot) if entity.category == "trace" => snapshot
} flatMap (_ histogram "elapsed-time") foreach { h =>
if (!h.isEmpty) channel ! lila.socket.Channel.Publish(
lila.socket.Socket.makeMessage("mlat", (h.sum / h.numberOfMeasurements / 1000000).toInt)
)
}
}
}
})))
})))
}
3 changes: 2 additions & 1 deletion modules/round/src/main/Round.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private[round] final class Round(
}

override def postStop() {
context.system.lilaBus unsubscribe self
super.postStop()
context.system.lilaBus.unsubscribe(self)
}

implicit val proxy = new GameProxy(gameId)
Expand Down
6 changes: 0 additions & 6 deletions modules/round/src/main/TvBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ import play.api.libs.json._

private final class TvBroadcast extends Actor {

context.system.lilaBus.subscribe(self, 'moveEvent, 'changeFeaturedGame)

override def postStop() {
context.system.lilaBus.unsubscribe(self)
}

private val (enumerator, channel) = Concurrent.broadcast[JsValue]

private var featuredId = none[String]
Expand Down
7 changes: 2 additions & 5 deletions modules/simul/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ final class Env(
flood = flood,
exists = repo.exists)

system.actorOf(Props(new Actor {
override def preStart() {
system.lilaBus.subscribe(self, 'finishGame, 'adjustCheater, 'moveEvent)
}
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
import akka.pattern.pipe
def receive = {
case lila.game.actorApi.FinishGame(game, _, _) => api finishGame game
Expand All @@ -91,7 +88,7 @@ final class Env(
}
}
}
}), name = ActorName)
}), name = ActorName), 'finishGame, 'adjustCheater, 'moveEvent)

def isHosting(userId: String): Fu[Boolean] = api.currentHostIds map (_ contains userId)

Expand Down
7 changes: 2 additions & 5 deletions modules/slack/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@ final class Env(
url = IncomingUrl,
defaultChannel = IncomingDefaultChannel)

system.actorOf(Props(new Actor {
override def preStart() {
system.lilaBus.subscribe(self, 'donation, 'deploy, 'slack)
}
system.lilaBus.subscribe(system.actorOf(Props(new Actor {
def receive = {
case d: DonationEvent => api donation d
case Deploy(RemindDeployPre, _) => api.deployPre
case Deploy(RemindDeployPost, _) => api.deployPost
case e: Event => api publishEvent e
}
}))
})), 'donation, 'deploy, 'slack)
}

object Env {
Expand Down
10 changes: 8 additions & 2 deletions modules/socket/src/main/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import play.api.libs.json.JsValue

final class Channel extends Actor {

context.system.lilaBus.subscribe(self, 'socketDoor)
override def preStart() {
context.system.lilaBus.subscribe(self, 'socketDoor)
}

override def postStop() {
super.postStop()
context.system.lilaBus.unsubscribe(self)
}

import Channel._

Expand All @@ -21,7 +28,6 @@ final class Channel extends Actor {
case SocketLeave(_, member) => members -= member

case Publish(msg) => members.foreach(_ push msg)

}
}

Expand Down
9 changes: 8 additions & 1 deletion modules/socket/src/main/MoveBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ import lila.hub.actorApi.round.MoveEvent

private final class MoveBroadcast extends Actor {

context.system.lilaBus.subscribe(self, 'moveEvent, 'socketDoor)
override def preStart() {
context.system.lilaBus.subscribe(self, 'moveEvent, 'socketDoor)
}

override def postStop() {
super.postStop()
context.system.lilaBus.unsubscribe(self)
}

type UID = String
type GameId = String
Expand Down
Loading

0 comments on commit ce04b8e

Please sign in to comment.