Skip to content

Commit

Permalink
Scrolling issues, fix releasing search context eagerly, they should n…
Browse files Browse the repository at this point in the history
…ot be released when scrolling, closes elastic#136.
  • Loading branch information
kimchy committed Nov 29, 2010
1 parent 6d9576c commit 84f97e9
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,14 @@ protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearc
if (docIdsToLoad == null) {
return;
}
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.getValue().id());
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.getValue().id());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public void processingScope(String scope) {
}

public void processedScope() {
// clean the current scope (we processed it, also handles scrolling since we don't want to
// do it again)
if (scopeCollectors != null) {
scopeCollectors.remove(processingScope);
}
this.processingScope = Scopes.NA;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ protected Client getClient() {
// index something into test_index, will match on both templates
client.prepareIndex("test_index", "type1", "1").setSource("field1", "value1", "field2", "value 2").setRefresh(true).execute().actionGet();

client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();

SearchResponse searchResponse = client.prepareSearch("test_index")
.setQuery(termQuery("field1", "value1"))
.addField("field1").addField("field2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.test.integration.search.scroll;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -109,4 +110,78 @@ protected Client getClient() {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
}

@Test public void testSimpleScrollQueryThenFetchSmallSizeUnevenDistribution() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();

client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();

for (int i = 0; i < 100; i++) {
String routing = "0";
if (i > 90) {
routing = "1";
} else if (i > 60) {
routing = "2";
}
client.prepareIndex("test", "type1", Integer.toString(i)).setSource("field", i).setRouting(routing).execute().actionGet();
}

client.admin().indices().prepareRefresh().execute().actionGet();

SearchResponse searchResponse = client.prepareSearch()
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(matchAllQuery())
.setSize(3)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();

long counter = 0;

assertThat(searchResponse.hits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(3));
for (SearchHit hit : searchResponse.hits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}

for (int i = 0; i < 32; i++) {
searchResponse = client.prepareSearchScroll(searchResponse.scrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();

assertThat(searchResponse.hits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(3));
for (SearchHit hit : searchResponse.hits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
}

// and now, the last one is one
searchResponse = client.prepareSearchScroll(searchResponse.scrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();

assertThat(searchResponse.hits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(1));
for (SearchHit hit : searchResponse.hits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}

// a the last is zero
searchResponse = client.prepareSearchScroll(searchResponse.scrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();

assertThat(searchResponse.hits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(0));
for (SearchHit hit : searchResponse.hits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
}
}

0 comments on commit 84f97e9

Please sign in to comment.