Skip to content

Commit

Permalink
Merge pull request #65 from wardle/collector-manager
Browse files Browse the repository at this point in the history
Use CollectorManager instead of now deprecated Collector API in search
  • Loading branch information
wardle committed Mar 26, 2024
2 parents c540381 + 1c14b10 commit e7242c8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 6 deletions.
37 changes: 33 additions & 4 deletions src/com/eldrix/hermes/impl/lucene.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
(ns ^:no-doc com.eldrix.hermes.impl.lucene
(:require [clojure.core.async :as a])
(:import (java.util List ArrayList)
(org.apache.lucene.search CollectionTerminatedException IndexSearcher BooleanClause$Occur BooleanQuery$Builder Query
(org.apache.lucene.search CollectionTerminatedException CollectorManager IndexSearcher BooleanClause$Occur BooleanQuery$Builder Query
MatchAllDocsQuery BooleanQuery BooleanClause Collector LeafCollector Scorable ScoreMode)))

(set! *warn-on-reflection* true)
Expand All @@ -26,6 +26,16 @@
(.add coll (+ base-id doc-id))))))
(scoreMode [_] ScoreMode/COMPLETE_NO_SCORES))

;; A Lucene CollectorManager that can collect results in parallel and then
;; create a lazy concatenation of the results once search is complete
(deftype IntoSequenceCollectorManager []
CollectorManager
(newCollector [_]
(IntoArrayCollector. (ArrayList.)))
(reduce [_ collectors]
(mapcat #(.-coll ^IntoArrayCollector %) collectors)))

;; A Lucene Collector that puts search results onto a core.async channel
(deftype IntoChannelCollector [ch]
Collector
(getLeafCollector [_ ctx]
Expand All @@ -37,23 +47,42 @@
(throw (CollectionTerminatedException.))))))) ;; ... then prematurely terminate collection of the current leaf
(scoreMode [_] ScoreMode/COMPLETE_NO_SCORES))

(defn search-all
;; A Lucene CollectorManager that can collect results in parallel putting
;; results onto a channel, optionally closing when done.
(deftype IntoChannelCollectorManager [ch close?]
CollectorManager
(newCollector [_]
(IntoChannelCollector. ch))
(reduce [_ _]
(when close? (a/close! ch))))

(defn ^:deprecated search-all*
"Search a lucene index and return *all* results matching query specified.
Results are returned as a sequence of Lucene document ids."
[^IndexSearcher searcher ^Query q]
(let [coll (ArrayList.)]
(.search searcher q (IntoArrayCollector. coll))
(seq coll)))

(defn stream-all
(defn search-all
[^IndexSearcher searcher ^Query q]
(.search searcher q (IntoSequenceCollectorManager.)))

(defn ^:deprecated stream-all*
"Search a lucene index and return *all* results on the channel specified.
Results are returned as Lucene document ids."
([^IndexSearcher searcher ^Query q ch]
(stream-all searcher q ch true))
(stream-all* searcher q ch true))
([^IndexSearcher searcher ^Query q ch close?]
(.search searcher q (IntoChannelCollector. ch))
(when close? (a/close! ch))))

(defn stream-all
([^IndexSearcher searcher ^Query q ch]
(stream-all searcher q ch true))
([^IndexSearcher searcher ^Query q ch close?]
(.search searcher q (IntoChannelCollectorManager. ch close?))))

(defn- single-must-not-clause?
"Checks that a boolean query isn't simply a single 'must-not' clause.
Such a query will fail to return any results if used alone."
Expand Down
40 changes: 38 additions & 2 deletions test/com/eldrix/hermes/search_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns com.eldrix.hermes.search-test
(:require [clojure.spec.gen.alpha :as gen]
(:require [clojure.core.async :as async]
[clojure.spec.gen.alpha :as gen]
[clojure.test :refer [deftest is]]
[com.eldrix.hermes.core :as hermes]
[com.eldrix.hermes.impl.lucene :as lucene]
[com.eldrix.hermes.impl.search :as search]))

(def example-results-1
Expand Down Expand Up @@ -37,4 +39,38 @@
(with-open [svc (hermes/open "snomed.db")]
(let [q (search/q-descendantOrSelfOf 24700007)]
(is (= (search/do-query-for-concept-ids (:searcher svc) q)
(into #{} (map :conceptId) (search/do-query-for-results (:searcher svc) q nil)))))))
(into #{} (map :conceptId) (search/do-query-for-results (:searcher svc) q nil)))))))

(defn ch->set
"Drain the clojure.core.async channel `ch` and return results as a set."
[ch]
(loop [results (transient #{})]
(if-let [result (async/<!! ch)]
(recur (conj! results result))
(persistent! results))))

(defn test-query [svc q]
(let [searcher (.-searcher svc)
ch1 (async/chan)
ch2 (async/chan)]
(async/thread (lucene/stream-all searcher q ch1))
(async/thread (lucene/stream-all* searcher q ch2))
(is (= (set (lucene/search-all searcher q))
(set (lucene/search-all* searcher q))
(ch->set ch1)
(ch->set ch2)) (str "Query returned different results" q))))

(deftest ^:live search-parallel
(with-open [svc (hermes/open "snomed.db")]
(let [concept-ids (take 5000 (shuffle (#'hermes/get-n-concept-ids svc 1000000)))]
(doseq [concept-id concept-ids]
(test-query svc (search/q-descendantOf concept-id))))))

(comment
(def svc (hermes/open "snomed.db"))
(def searcher (.-searcher svc))
(def q (search/q-descendantOf 25700007)) ;138875005
(require '[criterium.core :as crit])
(crit/bench (lucene/search-all searcher q))
(crit/bench (lucene/search-all* searcher q)))

0 comments on commit e7242c8

Please sign in to comment.