Skip to content

Commit

Permalink
rewrite background bulk indexing of games
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Sep 3, 2015
1 parent fe15294 commit a75039b
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions modules/gameSearch/src/main/GameSearchApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,43 +53,47 @@ final class GameSearchApi(client: ESClient) extends SearchReadApi[Game, Query] {
Fields.blackUser -> game.blackPlayer.userId
).noNull

def reset(max: Option[Int], since: Option[DateTime] = none): Funit = client match {
def reset(max: Option[Int]): Funit = client match {
case c: ESClientHttp => c.createTempIndex flatMap { temp =>
loginfo(s"Index to ${temp.tempIndex.name}")
since.foreach { s => loginfo(s"Since $s") }
val resetStartAt = DateTime.now
val selector = since.fold(Json.obj())(lila.game.Query.createdSince)
import lila.db.api._
import lila.game.tube.gameTube
var nb = 0
var nbSkipped = 0
var started = nowMillis
val resetStartSeconds = nowSeconds
for {
size <- $count(selector)
batchSize = 2000
limit = max | Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query(selector), batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
temp.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
}).logFailure("game bulk")
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames
val perS = (batchSize * 1000) / math.max(1, (nowMillis - started))
started = nowMillis
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb + nbSkipped, size, nbSkipped, perS))
}
}
_ <- doIndex(temp, Json.obj(), max)
_ = loginfo("[game search] Complete indexation in %ss".format(nowSeconds - resetStartSeconds))
_ <- doIndex(temp, lila.game.Query createdSince resetStartAt.minusHours(3), max)
_ = loginfo("[game search] Complete indexation in %ss".format(nowSeconds - resetStartSeconds))
_ <- temp.aliasBackToMain
_ <- since match {
case None => reset(max, resetStartAt.minusHours(3).some)
case _ => funit
}
} yield ()
}
case _ => funit
}

private def doIndex(temp: ESClientHttpTemp, selector: JsObject, max: Option[Int]): Funit = {
import lila.db.api._
import lila.game.tube.gameTube
var nb = 0
var nbSkipped = 0
var started = nowMillis
for {
size <- $count(selector)
batchSize = 1000
limit = max | Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query(selector), batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
temp.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
}).logFailure("game bulk")
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames
val perS = (batchSize * 1000) / math.max(1, (nowMillis - started))
started = nowMillis
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb + nbSkipped, size, nbSkipped, perS))
}
}
} yield ()
}
}

0 comments on commit a75039b

Please sign in to comment.