diff --git a/Makefile b/Makefile index 3386d9c2666..7b31656cc51 100644 --- a/Makefile +++ b/Makefile @@ -563,7 +563,7 @@ nouveau-test: nouveau-test-gradle nouveau-test-elixir .PHONY: nouveau-test-gradle nouveau-test-gradle: couch nouveau ifeq ($(with_nouveau), true) - @cd nouveau && ./gradlew test --info + @cd nouveau && ./gradlew test --info --rerun endif .PHONY: nouveau-test-elixir diff --git a/Makefile.win b/Makefile.win index 219f9bd91ed..183498d90a8 100644 --- a/Makefile.win +++ b/Makefile.win @@ -526,7 +526,7 @@ nouveau-test: nouveau-test-gradle nouveau-test-elixir .PHONY: nouveau-test-gradle nouveau-test-gradle: couch nouveau ifeq ($(with_nouveau), true) - @cd nouveau && .\gradlew test --info + @cd nouveau && .\gradlew test --info --rerun endif .PHONY: nouveau-test-elixir diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java index 2fc9e1f6968..155e8efb4bb 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java @@ -21,6 +21,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Positive; +import jakarta.validation.constraints.PositiveOrZero; import java.util.List; import java.util.Locale; import java.util.Map; @@ -32,6 +33,12 @@ public class SearchRequest { @NotNull private String query; + @PositiveOrZero + private long minUpdateSeq; + + @PositiveOrZero + private long minPurgeSeq; + private Locale locale; private String partition; @@ -64,6 +71,24 @@ public String getQuery() { return query; } + public void setMinUpdateSeq(final long minUpdateSeq) { + this.minUpdateSeq = minUpdateSeq; + } + + @JsonProperty + public long getMinUpdateSeq() { + return minUpdateSeq; + } + + public void setMinPurgeSeq(final long minPurgeSeq) { + this.minPurgeSeq = minPurgeSeq; + } + + @JsonProperty + public long getMinPurgeSeq() { + return minPurgeSeq; + } + public void setLocale(final Locale locale) { this.locale = locale; } @@ -154,7 +179,8 @@ public PrimitiveWrapper[] getAfter() { @Override public String toString() { - return "SearchRequest [query=" + query + ", locale=" + locale + ", sort=" + sort + ", limit=" + limit - + ", after=" + after + ", counts=" + counts + ", ranges=" + ranges + "]"; + return "SearchRequest [query=" + query + ", min_update_seq=" + minUpdateSeq + ", min_purge_seq=" + minPurgeSeq + + ", locale=" + locale + ", sort=" + sort + ", limit=" + limit + ", after=" + after + ", counts=" + + counts + ", ranges=" + ranges + "]"; } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index 4e27acbb564..b278fe6a42e 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -87,6 +87,7 @@ public final synchronized void delete(final String docId, final DocumentDeleteRe protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException; public final SearchResults search(final SearchRequest request) throws IOException { + assertMinSeqs(request.getMinUpdateSeq(), request.getMinPurgeSeq()); return doSearch(request); } @@ -182,4 +183,13 @@ protected final void incrementPurgeSeq(final long matchSeq, final long purgeSeq) assertPurgeSeqProgress(matchSeq, purgeSeq); this.purgeSeq = purgeSeq; } + + protected final void assertMinSeqs(final long minUpdateSeq, final long minPurgeSeq) throws StaleIndexException { + if (this.updateSeq < minUpdateSeq) { + throw new StaleIndexException(false, minUpdateSeq, this.updateSeq); + } + if (this.purgeSeq < minPurgeSeq) { + throw new StaleIndexException(true, minPurgeSeq, this.purgeSeq); + } + } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index f6bfaccc378..f434d5169fb 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Scheduler; import com.github.benmanes.caffeine.cache.Weigher; import io.dropwizard.lifecycle.Managed; +import jakarta.validation.constraints.PositiveOrZero; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Response.Status; import java.io.File; @@ -68,10 +69,13 @@ public interface IndexFunction { private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class); + @PositiveOrZero private int maxIndexesOpen; + @PositiveOrZero private int commitIntervalSeconds; + @PositiveOrZero private int idleSeconds; private Path rootDir; diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StaleIndexException.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StaleIndexException.java new file mode 100644 index 00000000000..605802894b1 --- /dev/null +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StaleIndexException.java @@ -0,0 +1,28 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.core; + +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response.Status; + +public final class StaleIndexException extends WebApplicationException { + + public StaleIndexException(final boolean purge, final long minSeq, final long actualSeq) { + super( + String.format( + "index is stale (%s seq needs to be at least %d but is %d)", + purge ? "purge" : "index", minSeq, actualSeq), + Status.CONFLICT); + } +} diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java index 61947dc7d84..d282de5681f 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java @@ -30,9 +30,11 @@ public class IndexHealthCheckTest { @Test public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception { var manager = new IndexManager(); - manager.setCommitIntervalSeconds(1); - manager.setObjectMapper(new ObjectMapper()); + manager.setCommitIntervalSeconds(30); + manager.setIdleSeconds(60); + manager.setMaxIndexesOpen(1); manager.setMetricRegistry(new MetricRegistry()); + manager.setObjectMapper(new ObjectMapper()); manager.setRootDir(tempDir); manager.setScheduler(Scheduler.systemScheduler()); manager.setSearcherFactory(new SearcherFactory()); diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 57de204f2d9..b01ef0f322b 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -174,6 +174,9 @@ search(#index{} = Index, QueryArgs) -> case Resp of {ok, "200", _, RespBody} -> {ok, jiffy:decode(RespBody, [return_maps])}; + {ok, "409", _, _} -> + %% Index was not current enough. + {error, stale_index}; {ok, StatusCode, _, RespBody} -> {error, jaxrs_error(StatusCode, RespBody)}; {error, Reason} -> @@ -186,6 +189,7 @@ set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) -> update_seq => UpdateSeq }, set_seq(ConnPid, Index, ReqBody). + set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) -> ReqBody = #{ match_purge_seq => MatchSeq, diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 1d11e98b4d5..efed245db48 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -18,7 +18,7 @@ -include("nouveau.hrl"). %% public api --export([outdated/1]). +-export([outdated/1, get_db_info/1]). %% callbacks -export([update/1]). diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl index 02f9ce90b10..85adf5b39e3 100644 --- a/src/nouveau/src/nouveau_rpc.erl +++ b/src/nouveau/src/nouveau_rpc.erl @@ -24,10 +24,19 @@ -include("nouveau.hrl"). -import(nouveau_util, [index_path/1]). -search(DbName, #index{} = Index0, QueryArgs) -> +search(DbName, #index{} = Index0, QueryArgs0) -> %% Incorporate the shard name into the record. Index1 = Index0#index{dbname = DbName}, - Update = maps:get(update, QueryArgs, true), + + %% get minimum seqs for search + {MinUpdateSeq, MinPurgeSeq} = nouveau_index_updater:get_db_info(Index1), + + %% Incorporate min seqs into the query args. + QueryArgs1 = QueryArgs0#{ + min_update_seq => MinUpdateSeq, + min_purge_seq => MinPurgeSeq + }, + Update = maps:get(update, QueryArgs1, true), %% check if index is up to date case Update andalso nouveau_index_updater:outdated(Index1) of @@ -45,7 +54,13 @@ search(DbName, #index{} = Index0, QueryArgs) -> end, %% Run the search - rexi:reply(nouveau_api:search(Index1, QueryArgs)). + case nouveau_api:search(Index1, QueryArgs1) of + {error, stale_index} -> + %% try again. + search(DbName, Index0, QueryArgs0); + Else -> + rexi:reply(Else) + end. info(DbName, #index{} = Index0) -> %% Incorporate the shard name into the record.