Skip to content

Commit

Permalink
background indexing of all indices
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Sep 3, 2015
1 parent 85f7ca2 commit e2f34a0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 52 deletions.
18 changes: 11 additions & 7 deletions modules/forumSearch/src/main/ForumSearchApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ final class ForumSearchApi(
Fields.troll -> view.post.troll,
Fields.date -> view.post.createdAt.getDate)

def reset = client.putMapping >> {
import lila.db.api._
import lila.forum.tube.postTube
$enumerate.bulk[Option[Post]]($query[Post](Json.obj()), 500) { postOptions =>
(postApi liteViews postOptions.flatten) flatMap { views =>
client.storeBulk(views map (v => Id(v.post.id) -> toDoc(v)))
}
def reset = client match {
case c: ESClientHttp => c.createTempIndex flatMap { temp =>
loginfo(s"Index to ${temp.tempIndex.name}")
import lila.db.api._
import lila.forum.tube.postTube
$enumerate.bulk[Option[Post]]($query[Post](Json.obj()), 500) { postOptions =>
(postApi liteViews postOptions.flatten) flatMap { views =>
temp.storeBulk(views map (v => Id(v.post.id) -> toDoc(v)))
}
} >> temp.aliasBackToMain
}
case _ => funit
}
}
57 changes: 31 additions & 26 deletions modules/gameSearch/src/main/GameSearchApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,37 @@ final class GameSearchApi(client: ESClient) extends SearchReadApi[Game, Query] {
Fields.blackUser -> game.blackPlayer.userId
).noNull

def reset(max: Option[Int]) = client.putMapping >> {
import lila.db.api._
import lila.game.tube.gameTube
var nb = 0
var nbSkipped = 0
var started = nowMillis
for {
size <- $count($select.all)
batchSize = 2000
limit = max | Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query.all, batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
client.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
}).logFailure("game bulk")
// funit // async!
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames
val perS = (batchSize * 1000) / math.max(1, (nowMillis - started))
started = nowMillis
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb + nbSkipped, size, nbSkipped, perS))
def reset(max: Option[Int]) = client match {
case c: ESClientHttp => c.createTempIndex flatMap { temp =>
loginfo(s"Index to ${temp.tempIndex.name}")
import lila.db.api._
import lila.game.tube.gameTube
var nb = 0
var nbSkipped = 0
var started = nowMillis
for {
size <- $count($select.all)
batchSize = 2000
limit = max | Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query.all, batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
temp.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
}).logFailure("game bulk")
// funit // async!
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames
val perS = (batchSize * 1000) / math.max(1, (nowMillis - started))
started = nowMillis
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb + nbSkipped, size, nbSkipped, perS))
}
}
}
} yield ()
_ <- temp.aliasBackToMain
} yield ()
}
case _ => funit
}
}
46 changes: 32 additions & 14 deletions modules/search/src/main/ESClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,21 @@ sealed trait ESClient {

def store(id: Id, doc: JsObject): Funit

def storeBulk(docs: Seq[(Id, JsObject)]): Funit

def deleteById(id: Id): Funit

def deleteByQuery(query: StringQuery): Funit

def putMapping: Funit
}

final class ESClientHttp(endpoint: String, index: Index, writeable: Boolean) extends ESClient {
final class ESClientHttp(
endpoint: String,
val index: Index,
writeable: Boolean) extends ESClient {
import play.api.libs.ws.WS
import play.api.Play.current

def store(id: Id, doc: JsObject) = writeable ??
HTTP(s"store/${index.name}/${id.value}", doc)

def storeBulk(docs: Seq[(Id, JsObject)]) = writeable ??
HTTP(s"store/bulk/${index.name}", JsObject(docs map {
case (Id(id), doc) => id -> JsString(Json.stringify(doc))
}))

def search[Q: Writes](query: Q, from: From, size: Size) =
HTTP(s"search/${index.name}/${from.value}/${size.value}", query, SearchResponse.apply)

Expand All @@ -43,19 +37,43 @@ final class ESClientHttp(endpoint: String, index: Index, writeable: Boolean) ext
def deleteByQuery(query: lila.search.StringQuery) = writeable ??
HTTP(s"delete/query/${index.name}/${query.value}", Json.obj())

def putMapping = writeable ??
HTTP(s"mapping/${index.name}", Json.obj())
def createTempIndex = {
val tempIndex = Index(s"${index.name}_temp_${ornicar.scalalib.Random.nextString(4)}")
val tempClient = new ESClientHttpTemp(
index,
new ESClientHttp(endpoint, tempIndex, writeable))
tempClient.putMapping inject tempClient
}

private def HTTP[D: Writes, R](url: String, data: D, read: String => R): Fu[R] =
private[search] def HTTP[D: Writes, R](url: String, data: D, read: String => R): Fu[R] =
WS.url(s"$endpoint/$url").post(Json toJson data) flatMap {
case res if res.status == 200 => fuccess(read(res.body))
case res => fufail(s"$url ${res.status}")
}
private def HTTP(url: String, data: JsObject): Funit = HTTP(url, data, _ => ())
private[search] def HTTP(url: String, data: JsObject): Funit = HTTP(url, data, _ => ())

private val logger = play.api.Logger("ESClientHttp")
}

final class ESClientHttpTemp(
mainIndex: Index,
client: ESClientHttp) {

def putMapping =
client.HTTP(s"mapping/${tempIndex.name}/${mainIndex.name}", Json.obj())

def tempIndex = client.index

def storeBulk(docs: Seq[(Id, JsObject)]) =
client.HTTP(s"store/bulk/${tempIndex.name}/${mainIndex.name}", JsObject(docs map {
case (Id(id), doc) => id -> JsString(Json.stringify(doc))
}))

def aliasBackToMain =
client.HTTP(s"alias/${tempIndex.name}/${mainIndex.name}", Json.obj())

}

final class ESClientStub extends ESClient {
def search[Q: Writes](query: Q, from: From, size: Size) = fuccess(SearchResponse(Nil))
def count[Q: Writes](query: Q) = fuccess(CountResponse(0))
Expand Down
14 changes: 9 additions & 5 deletions modules/teamSearch/src/main/TeamSearchApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ final class TeamSearchApi(
Fields.location -> team.location,
Fields.nbMembers -> team.nbMembers)

def reset = client.putMapping >> {
import lila.db.api._
import lila.team.tube.teamTube
$enumerate.bulk[Option[Team]]($query[Team](Json.obj("enabled" -> true)), 300) { teamOptions =>
client.storeBulk(teamOptions.flatten map (t => Id(t.id) -> toDoc(t)))
def reset = client match {
case c: ESClientHttp => c.createTempIndex flatMap { temp =>
loginfo(s"Index to ${temp.tempIndex.name}")
import lila.db.api._
import lila.team.tube.teamTube
$enumerate.bulk[Option[Team]]($query[Team](Json.obj("enabled" -> true)), 300) { teamOptions =>
temp.storeBulk(teamOptions.flatten map (t => Id(t.id) -> toDoc(t)))
} >> temp.aliasBackToMain
}
case _ => funit
}
}

0 comments on commit e2f34a0

Please sign in to comment.