Skip to content

Commit

Permalink
also monitor viewers on pushed broadcasts
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Jun 20, 2024
1 parent 92d00e5 commit 0cea482
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
5 changes: 2 additions & 3 deletions modules/relay/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ final class Env(

lazy val listing: RelayListing = wire[RelayListing]

lazy val stats = wire[RelayStatsApi]

lazy val api: RelayApi = wire[RelayApi]

lazy val tourStream: RelayTourStream = wire[RelayTourStream]
Expand Down Expand Up @@ -90,9 +92,6 @@ final class Env(

private lazy val delay = wire[RelayDelay]

// must instanciate eagerly to start the scheduler
val stats = wire[RelayStatsApi]

import SettingStore.CredentialsOption.given
val proxyCredentials = settingStore[Option[Credentials]](
"relayProxyCredentials",
Expand Down
5 changes: 3 additions & 2 deletions modules/relay/src/main/RelayPush.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import akka.actor.*
import akka.pattern.after
import chess.format.pgn.{ Parser, PgnStr, San, Std, Tags }
import chess.{ ErrorStr, Game, Replay, Square }

import scala.concurrent.duration.*

import scalalib.actor.AsyncActorSequencers

import lila.study.{ MultiPgn, StudyPgnImport }

final class RelayPush(
sync: RelaySync,
api: RelayApi,
stats: RelayStatsApi,
irc: lila.core.irc.IrcApi
)(using ActorSystem, Executor, Scheduler):

Expand Down Expand Up @@ -50,6 +50,7 @@ final class RelayPush(
.flatMap: event =>
if !rt.round.hasStarted && !rt.tour.official && event.hasMoves then
irc.broadcastStart(rt.round.id, rt.fullName)
stats.setActive(rt.round.id)
api
.update(rt.round): r1 =>
val r2 = r1.withSync(_.addLog(event))
Expand Down
12 changes: 7 additions & 5 deletions modules/relay/src/main/RelayStatsApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ final class RelayStatsApi(roundRepo: RelayRoundRepo, colls: RelayColls)(using sc
.toList
yield RoundStats(round, stats)

def setActive(id: RelayRoundId) = activeRounds.put(id)

// keep monitoring rounds for 30m after they stopped syncing
private val activeRounds = ExpireSetMemo[RelayRoundId](30 minutes)

private def record(): Funit = for
crowds <- fetchRoundCrowds
nowMinutes = nowSeconds / 60
Expand Down Expand Up @@ -75,9 +80,6 @@ final class RelayStatsApi(roundRepo: RelayRoundRepo, colls: RelayColls)(using sc
_ <- elements.nonEmpty.so(update.many(elements).void)
yield ()

// keep monitoring rounds for 30m after they stopped syncing
private val syncTail = ExpireSetMemo[RelayRoundId](30 minutes)

private def fetchRoundCrowds: Fu[List[(RelayRoundId, Crowd)]] =
val max = 500
colls.round
Expand All @@ -87,7 +89,7 @@ final class RelayStatsApi(roundRepo: RelayRoundRepo, colls: RelayColls)(using sc
$doc(
$or(
$doc("sync.until" -> $exists(true)),
$inIds(syncTail.keys)
$inIds(activeRounds.keys)
),
"crowd".$gt(0)
)
Expand All @@ -100,5 +102,5 @@ final class RelayStatsApi(roundRepo: RelayRoundRepo, colls: RelayColls)(using sc
doc <- docs
id <- doc.getAsOpt[RelayRoundId]("_id")
crowd <- doc.getAsOpt[Crowd]("crowd")
_ = if doc.contains("syncing") then syncTail.put(id)
_ = if doc.contains("syncing") then activeRounds.put(id)
yield (id, crowd)

0 comments on commit 0cea482

Please sign in to comment.