Skip to content

Commit

Permalink
new search clock selectors are working
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Mar 14, 2016
1 parent d8b7d88 commit ea39172
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 48 deletions.
8 changes: 4 additions & 4 deletions modules/gameSearch/src/main/DataForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ private[gameSearch] final class DataForm {
"durationMin" -> optional(numberIn(Query.durations)),
"durationMax" -> optional(numberIn(Query.durations)),
"clock" -> mapping(
"clockInitMin" -> optional(numberIn(Query.clockInits)),
"clockInitMax" -> optional(numberIn(Query.clockInits)),
"clockIncMin" -> optional(numberIn(Query.clockIncs)),
"clockIncMax" -> optional(numberIn(Query.clockIncs))
"initMin" -> optional(numberIn(Query.clockInits)),
"initMax" -> optional(numberIn(Query.clockInits)),
"incMin" -> optional(numberIn(Query.clockIncs)),
"incMax" -> optional(numberIn(Query.clockIncs))
)(SearchClock.apply)(SearchClock.unapply),
"dateMin" -> DataForm.dateField,
"dateMax" -> DataForm.dateField,
Expand Down
4 changes: 2 additions & 2 deletions modules/gameSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ final class Env(
import akka.pattern.ask
private implicit def timeout = makeTimeout minutes 60
def process = {
case "game" :: "search" :: "reset" :: Nil => api.reset(none) inject "done"
case "game" :: "search" :: "reset" :: nb :: Nil => api.reset(parseIntOption(nb)) inject "done"
case "game" :: "search" :: "index" :: "all" :: Nil => api.indexAll inject "done"
case "game" :: "search" :: "index" :: since :: Nil => api.indexSince(since) inject "done"
}
}
}
Expand Down
108 changes: 66 additions & 42 deletions modules/gameSearch/src/main/GameSearchApi.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package lila.gameSearch

import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import play.api.libs.iteratee._
import play.api.libs.json._
import scala.util.{ Try, Success, Failure }

import lila.common.PimpedJson._
import lila.game.actorApi._
import lila.game.{ Game, GameRepo }
import lila.search._

import org.joda.time.DateTime
import play.api.libs.json._

final class GameSearchApi(client: ESClient) extends SearchReadApi[Game, Query] {

private var writeable = true
Expand Down Expand Up @@ -57,50 +60,71 @@ final class GameSearchApi(client: ESClient) extends SearchReadApi[Game, Query] {
Fields.source -> game.source.map(_.id)
).noNull

def reset(max: Option[Int]): Funit = client match {
case c: ESClientHttp =>
loginfo(s"Index to ${c.index.name}")
val resetStartAt = DateTime.now
val resetStartSeconds = nowSeconds
writeable = false
Thread sleep 3000
for {
_ <- c.putMapping
_ <- doIndex(c, Json.obj(), max)
_ = loginfo("[game search] Complete indexation in %ss".format(nowSeconds - resetStartSeconds))
_ <- doIndex(c, lila.game.Query createdSince resetStartAt.minusHours(3), max)
_ = loginfo("[game search] Complete indexation in %ss".format(nowSeconds - resetStartSeconds))
} yield {
writeable = true
}
case _ => funit
def indexAll: Funit = {
writeable = false
Thread sleep 3000
client match {
case c: ESClientHttp =>
loginfo(s"Drop ${c.index.name}")
writeable = false
Thread sleep 3000
c.putMapping >> indexSince("2011-01-01")
case _ => funit
}
}

private def doIndex(client: ESClientHttp, selector: JsObject, max: Option[Int]): Funit = {
import lila.db.api._
import lila.game.tube.gameTube
def indexSince(sinceStr: String): Funit =
parseDate(sinceStr).fold(fufail[Unit](s"Invalid date $sinceStr")) { since =>
client match {
case c: ESClientHttp =>
loginfo(s"Index to ${c.index.name} since $since")
writeable = false
Thread sleep 3000
doIndex(c, since) >>- {
loginfo("[game search] Completed indexation.")
Thread sleep 3000
writeable = true
}
case _ => funit
}
}

private val datePattern = "yyyy-MM-dd"
private val dateFormatter = DateTimeFormat forPattern datePattern
private val dateTimeFormatter = DateTimeFormat forPattern s"$datePattern HH:mm"

private def parseDate(str: String): Option[DateTime] =
Try(dateFormatter parseDateTime str).toOption

private def doIndex(client: ESClientHttp, since: DateTime): Funit = {
import lila.game.BSONHandlers._
import lila.db.BSON.BSONJodaDateTimeHandler
import reactivemongo.bson._
var nb = 0
var nbSkipped = 0
var started = nowMillis
for {
size <- $count(selector)
batchSize = 1000
limit = max | Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query(selector), batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
val batchSize = 1000
// val maxGames = Int.MaxValue
val maxGames = 10 * 1000 * 1000

lila.game.tube.gameTube.coll.find(BSONDocument(
"ca" -> BSONDocument("$gt" -> since)
)).sort(BSONDocument("ca" -> 1))
.cursor[Game]()
.enumerate(maxGames, stopOnError = true) &>
Enumeratee.grouped(Iteratee takeUpTo batchSize) |>>>
Enumeratee.mapM[Seq[Game]].apply[(Seq[Game], Set[String])] { games =>
GameRepo filterAnalysed games.map(_.id) map games.->
} &>
Iteratee.foldM[(Seq[Game], Set[String]), Long](nowMillis) {
case (millis, (games, analysedIds)) =>
client.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
}).logFailure("game bulk")
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames
val perS = (batchSize * 1000) / math.max(1, (nowMillis - started))
started = nowMillis
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb + nbSkipped, size, nbSkipped, perS))
}
}
} yield ()
}) inject {
val date = games.headOption.map(_.createdAt) ?? dateTimeFormatter.print
val gameMs = (nowMillis - millis) / batchSize.toDouble
logger.info(s"$date $nb ${(1000 / gameMs).toInt} games/s")
nowMillis
}
} void
}
}

0 comments on commit ea39172

Please sign in to comment.