Skip to content

Commit

Permalink
change lila.search API to allow background re-indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Aug 27, 2015
1 parent 5f2b298 commit ee53675
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 70 deletions.
4 changes: 2 additions & 2 deletions app/controllers/Search.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object Search extends LilaController {
implicit def req = ctx.body
searchForm.bindFromRequest.fold(
failure => Ok(html.search.index(failure)).fuccess,
data => env.nonEmptyQuery(data) ?? { query =>
data => data.nonEmptyQuery ?? { query =>
env.paginator(query, page) map (_.some)
} map { pager =>
Ok(html.search.index(searchForm fill data, pager))
Expand All @@ -34,7 +34,7 @@ object Search extends LilaController {
implicit def req = ctx.body
searchForm.bindFromRequest.fold(
failure => Ok(html.search.index(failure)).fuccess,
data => env.nonEmptyQuery(data) ?? { query =>
data => data.nonEmptyQuery ?? { query =>
env.paginator.ids(query, 5000) map { ids =>
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
Expand Down
6 changes: 2 additions & 4 deletions modules/forumSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ final class Env(
postApi = postApi
)), name = IndexerName)

def apply(text: String, page: Int, staff: Boolean, troll: Boolean) = {
val query = Query(s"$IndexName/$TypeName", text, staff, troll)
paginatorBuilder(query, page)
}
def apply(text: String, page: Int, staff: Boolean, troll: Boolean) =
paginatorBuilder(Query(text, staff, troll), page)

def cli = new lila.common.Cli {
import akka.pattern.ask
Expand Down
12 changes: 8 additions & 4 deletions modules/forumSearch/src/main/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.pattern.pipe
import com.sksamuel.elastic4s
import elastic4s.SimpleAnalyzer
import elastic4s.ElasticClient
import com.sksamuel.elastic4s.IndexType
import elastic4s.ElasticDsl._
import elastic4s.mappings.FieldType._

Expand All @@ -20,10 +21,13 @@ private[forumSearch] final class Indexer(

private val indexType = s"$indexName/$typeName"

def readIndexType = IndexType(indexName, typeName)
def writeIndexType = readIndexType

def receive = {

case Search(definition) => client execute definition pipeTo sender
case Count(definition) => client execute definition pipeTo sender
case Search(definition) => client execute definition(readIndexType) pipeTo sender
case Count(definition) => client execute definition(readIndexType) pipeTo sender

case InsertPost(post) => postApi liteView post foreach {
_ foreach { view =>
Expand All @@ -32,11 +36,11 @@ private[forumSearch] final class Indexer(
}

case RemovePost(id) => client execute {
delete id id from indexType
delete id id from writeIndexType
}

case RemoveTopic(id) => client execute {
delete from indexType where s"${Fields.topicId}:$id"
delete from writeIndexType where s"${Fields.topicId}:$id"
}

case Reset =>
Expand Down
10 changes: 4 additions & 6 deletions modules/forumSearch/src/main/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import org.elasticsearch.search.sort.SortOrder
import lila.search.ElasticSearch

private[forumSearch] final class Query private (
indexType: String,
terms: List[String],
staff: Boolean,
troll: Boolean) extends lila.search.Query {

def searchDef(from: Int = 0, size: Int = 10) =
def searchDef(from: Int = 0, size: Int = 10) = indexType =>
search in indexType query makeQuery sort (
field sort Fields.date order SortOrder.DESC
) start from size size

def countDef = count from indexType query makeQuery
def countDef = indexType => count from indexType query makeQuery

private def queryTerms = terms filterNot (_ startsWith "user:")
private def userSearch = terms find (_ startsWith "user:") map { _ drop 5 }
Expand Down Expand Up @@ -49,7 +48,6 @@ object Query {

private val searchableFields = List(Fields.body, Fields.topic, Fields.author)

def apply(indexType: String, text: String, staff: Boolean, troll: Boolean): Query = new Query(
indexType, ElasticSearch decomposeTextQuery text, staff, troll
)
def apply(text: String, staff: Boolean, troll: Boolean): Query =
new Query(ElasticSearch decomposeTextQuery text, staff, troll)
}
7 changes: 3 additions & 4 deletions modules/gameSearch/src/main/DataForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ private[gameSearch] case class SearchData(
analysed: Option[Int] = None,
sort: SearchSort = SearchSort()) {

def query(indexType: String) = Query(
indexType = indexType,
def query = Query(
user1 = players.cleanA,
user2 = players.cleanB,
winner = players.cleanWinner,
Expand All @@ -87,8 +86,8 @@ private[gameSearch] case class SearchData(
blackUser = players.cleanBlack,
sorting = Sorting(sort.field, sort.order))

def nonEmptyQuery(indexType: String) = {
val q = query(indexType)
def nonEmptyQuery = {
val q = query
q.nonEmpty option q
}

Expand Down
9 changes: 3 additions & 6 deletions modules/gameSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class Env(

private val indexer: ActorRef = system.actorOf(Props(new Indexer(
client = client,
indexName = IndexName,
initialIndexName = IndexName,
typeName = TypeName
)), name = IndexerName)

Expand All @@ -32,17 +32,14 @@ final class Env(

lazy val userGameSearch = new UserGameSearch(
forms = forms,
paginator = paginator,
indexType = s"$IndexName/$TypeName")

def nonEmptyQuery(data: SearchData) = data nonEmptyQuery s"$IndexName/$TypeName"
paginator = paginator)

def cli = new lila.common.Cli {
import akka.pattern.ask
private implicit def timeout = makeTimeout minutes 60
def process = {
case "game" :: "search" :: "reset" :: Nil =>
(indexer ? lila.search.actorApi.Reset) inject "Game search index rebuilt"
(indexer ? lila.search.actorApi.Reset) inject "Game search index rebuilding"
}
}
}
Expand Down
49 changes: 31 additions & 18 deletions modules/gameSearch/src/main/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package lila.gameSearch

import akka.actor._
import akka.pattern.pipe
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture => _, _ }
import com.sksamuel.elastic4s.mappings.FieldType._
import com.sksamuel.elastic4s.{ ElasticClient, IndexType }

import lila.game.actorApi.{ InsertGame, FinishGame }
import lila.game.GameRepo
Expand All @@ -13,31 +13,40 @@ import lila.search.ElasticSearch

private[gameSearch] final class Indexer(
client: ElasticClient,
indexName: String,
initialIndexName: String,
typeName: String) extends Actor {

context.system.lilaBus.subscribe(self, 'finishGame)

var readIndex = initialIndexName
var writeIndex = initialIndexName

def readIndexType = IndexType(readIndex, typeName)

private case object FinalizeReset
var resetInProgress = false

def receive = {

case Search(definition) => client execute definition pipeTo sender
case Count(definition) => client execute definition pipeTo sender
case Search(definition) => client execute definition(readIndexType) pipeTo sender
case Count(definition) => client execute definition(readIndexType) pipeTo sender

case FinishGame(game, _, _) => self ! InsertGame(game)

case InsertGame(game) => if (storable(game)) {
GameRepo isAnalysed game.id foreach { analysed =>
client execute store(indexName, game, analysed)
client execute store(writeIndex, game, analysed)
}
}

case Reset =>
val replyTo = sender
val tempIndexName = "lila_" + ornicar.scalalib.Random.nextString(4)
ElasticSearch.createType(client, tempIndexName, typeName)
if (resetInProgress) sys error "[game search] already resetting!"
resetInProgress = true
writeIndex = initialIndexName + "_" + ornicar.scalalib.Random.nextString(6)
ElasticSearch.createType(client, writeIndex, typeName)
import Fields._
client.execute {
put mapping tempIndexName / typeName as Seq(
put mapping writeIndex / typeName as Seq(
status typed ShortType,
turns typed ShortType,
rated typed BooleanType,
Expand All @@ -55,6 +64,7 @@ private[gameSearch] final class Indexer(
blackUser typed StringType
).map(_ index "not_analyzed")
}.await
sender ! (())
import scala.concurrent.duration._
import play.api.libs.json.Json
import lila.db.api._
Expand All @@ -71,7 +81,7 @@ private[gameSearch] final class Indexer(
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
client execute {
bulk {
games.map { g => store(tempIndexName, g, analysedIds(g.id)) }: _*
games.map { g => store(writeIndex, g, analysedIds(g.id)) }: _*
}
}
}).void >>- {
Expand All @@ -82,15 +92,18 @@ private[gameSearch] final class Indexer(
loginfo("[game search] Indexed %d of %d, skipped %d, at %d/s".format(nb, size, nbSkipped, perS))
}
} >>- {
loginfo("[game search] Deleting previous index")
client.execute { deleteIndex(indexName) }.await
loginfo("[game search] Creating new index alias")
client.execute {
add alias indexName on tempIndexName
}.await
loginfo("[game search] All set!")
replyTo ! (())
self ! FinalizeReset
}

case FinalizeReset =>
loginfo(s"[game search] Deleting previous index $initialIndexName")
client.execute { deleteIndex(initialIndexName) }.await
loginfo(s"[game search] Creating new index alias $initialIndexName -> $writeIndex")
client.execute { add alias initialIndexName on writeIndex }.await
loginfo("[game search] All set!")
readIndex = initialIndexName
writeIndex = initialIndexName
resetInProgress = false
}

private def storable(game: lila.game.Game) =
Expand Down
5 changes: 2 additions & 3 deletions modules/gameSearch/src/main/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import lila.rating.RatingRange
import lila.search.{ ElasticSearch, Range }

case class Query(
indexType: String,
user1: Option[String] = None,
user2: Option[String] = None,
winner: Option[String] = None,
Expand Down Expand Up @@ -49,10 +48,10 @@ case class Query(
date.nonEmpty ||
duration.nonEmpty

def searchDef(from: Int = 0, size: Int = 10) =
def searchDef(from: Int = 0, size: Int = 10) = indexType =>
search in indexType query makeQuery sort sorting.definition start from size size

def countDef = count from indexType query makeQuery
def countDef = indexType => count from indexType query makeQuery

private lazy val makeQuery = filteredQuery query matchall filter {
List(
Expand Down
5 changes: 2 additions & 3 deletions modules/gameSearch/src/main/UserGameSearch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import play.api.mvc.Request

final class UserGameSearch(
forms: DataForm,
paginator: lila.search.PaginatorBuilder[Game],
indexType: String) {
paginator: lila.search.PaginatorBuilder[Game]) {

def apply(user: lila.user.User, page: Int)(implicit req: Request[_]): Fu[Paginator[Game]] =
paginator(
Expand All @@ -16,7 +15,7 @@ final class UserGameSearch(
data => data.copy(
players = data.players.copy(a = user.id.some)
)
) query indexType,
).query,
page = page)

def requestForm(implicit req: Request[_]) = forms.search.bindFromRequest
Expand Down
4 changes: 2 additions & 2 deletions modules/search/src/main/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.sksamuel.elastic4s.{ CountDefinition, SearchDefinition }

trait Query {

def searchDef(from: Int = 0, size: Int = 10): SearchDefinition
def searchDef(from: Int = 0, size: Int = 10): FreeSearchDefinition

def countDef: CountDefinition
def countDef: FreeCountDefinition
}
5 changes: 2 additions & 3 deletions modules/search/src/main/actorApi.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package lila.search
package actorApi

import com.sksamuel.elastic4s.{ CountDefinition, SearchDefinition }
import org.elasticsearch.action.search.{ SearchResponse => ESSR }

case object Reset

case class Search(definition: SearchDefinition)
case class Search(definition: FreeSearchDefinition)
case class SearchResponse(res: ESSR)

case class Count(definition: CountDefinition)
case class Count(definition: FreeCountDefinition)
case class CountResponse(res: Int)
8 changes: 7 additions & 1 deletion modules/search/src/main/package.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
package lila

package object search extends PackageObject with WithPlay
package object search extends PackageObject with WithPlay {

import com.sksamuel.elastic4s.{ IndexType, SearchDefinition, CountDefinition }

type FreeSearchDefinition = IndexType => SearchDefinition
type FreeCountDefinition = IndexType => CountDefinition
}
5 changes: 1 addition & 4 deletions modules/teamSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ final class Env(
typeName = TypeName
)), name = IndexerName)

def apply(text: String, page: Int) = {
val query = Query(s"$IndexName/$TypeName", text)
paginatorBuilder(query, page)
}
def apply(text: String, page: Int) = paginatorBuilder(Query(text), page)

def cli = new lila.common.Cli {
import akka.pattern.ask
Expand Down
11 changes: 7 additions & 4 deletions modules/teamSearch/src/main/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package lila.teamSearch

import akka.actor._
import akka.pattern.pipe
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.IndexType
import com.sksamuel.elastic4s.mappings.FieldType._
import com.sksamuel.elastic4s.ElasticClient

import lila.search.actorApi._
import lila.team.actorApi._
Expand All @@ -17,10 +18,12 @@ private[teamSearch] final class Indexer(

private val indexType = s"$indexName/$typeName"

def readIndexType = IndexType(indexName, typeName)

def receive = {

case Search(definition) => client execute definition pipeTo sender
case Count(definition) => client execute definition pipeTo sender
case Search(definition) => client execute definition(readIndexType) pipeTo sender
case Count(definition) => client execute definition(readIndexType) pipeTo sender

case InsertTeam(team) => client execute store(team)

Expand All @@ -32,7 +35,7 @@ private[teamSearch] final class Indexer(
lila.search.ElasticSearch.createType(client, indexName, typeName)
try {
client execute {
put mapping indexName/typeName as Seq(
put mapping indexName / typeName as Seq(
Fields.name typed StringType boost 3,
Fields.description typed StringType boost 2,
Fields.location typed StringType,
Expand Down
Loading

0 comments on commit ee53675

Please sign in to comment.