Skip to content

Commit

Permalink
Merge pull request #5081 from apache/nouveau-healthcheck-test-failure…
Browse files Browse the repository at this point in the history
…-fix

Ensure index is up to date when querying
  • Loading branch information
rnewson committed Jun 8, 2024
2 parents d9329f2 + de63bfe commit dbf8d02
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ nouveau-test: nouveau-test-gradle nouveau-test-elixir
.PHONY: nouveau-test-gradle
nouveau-test-gradle: couch nouveau
ifeq ($(with_nouveau), true)
@cd nouveau && ./gradlew test --info
@cd nouveau && ./gradlew test --info --rerun
endif

.PHONY: nouveau-test-elixir
Expand Down
2 changes: 1 addition & 1 deletion Makefile.win
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ nouveau-test: nouveau-test-gradle nouveau-test-elixir
.PHONY: nouveau-test-gradle
nouveau-test-gradle: couch nouveau
ifeq ($(with_nouveau), true)
@cd nouveau && .\gradlew test --info
@cd nouveau && .\gradlew test --info --rerun
endif

.PHONY: nouveau-test-elixir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import jakarta.validation.constraints.PositiveOrZero;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -32,6 +33,12 @@ public class SearchRequest {
@NotNull
private String query;

@PositiveOrZero
private long minUpdateSeq;

@PositiveOrZero
private long minPurgeSeq;

private Locale locale;

private String partition;
Expand Down Expand Up @@ -64,6 +71,24 @@ public String getQuery() {
return query;
}

public void setMinUpdateSeq(final long minUpdateSeq) {
this.minUpdateSeq = minUpdateSeq;
}

@JsonProperty
public long getMinUpdateSeq() {
return minUpdateSeq;
}

public void setMinPurgeSeq(final long minPurgeSeq) {
this.minPurgeSeq = minPurgeSeq;
}

@JsonProperty
public long getMinPurgeSeq() {
return minPurgeSeq;
}

public void setLocale(final Locale locale) {
this.locale = locale;
}
Expand Down Expand Up @@ -154,7 +179,8 @@ public PrimitiveWrapper<?>[] getAfter() {

@Override
public String toString() {
return "SearchRequest [query=" + query + ", locale=" + locale + ", sort=" + sort + ", limit=" + limit
+ ", after=" + after + ", counts=" + counts + ", ranges=" + ranges + "]";
return "SearchRequest [query=" + query + ", min_update_seq=" + minUpdateSeq + ", min_purge_seq=" + minPurgeSeq
+ ", locale=" + locale + ", sort=" + sort + ", limit=" + limit + ", after=" + after + ", counts="
+ counts + ", ranges=" + ranges + "]";
}
}
10 changes: 10 additions & 0 deletions nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final synchronized void delete(final String docId, final DocumentDeleteRe
protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;

public final SearchResults search(final SearchRequest request) throws IOException {
assertMinSeqs(request.getMinUpdateSeq(), request.getMinPurgeSeq());
return doSearch(request);
}

Expand Down Expand Up @@ -182,4 +183,13 @@ protected final void incrementPurgeSeq(final long matchSeq, final long purgeSeq)
assertPurgeSeqProgress(matchSeq, purgeSeq);
this.purgeSeq = purgeSeq;
}

protected final void assertMinSeqs(final long minUpdateSeq, final long minPurgeSeq) throws StaleIndexException {
if (this.updateSeq < minUpdateSeq) {
throw new StaleIndexException(false, minUpdateSeq, this.updateSeq);
}
if (this.purgeSeq < minPurgeSeq) {
throw new StaleIndexException(true, minPurgeSeq, this.purgeSeq);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import io.dropwizard.lifecycle.Managed;
import jakarta.validation.constraints.PositiveOrZero;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response.Status;
import java.io.File;
Expand Down Expand Up @@ -68,10 +69,13 @@ public interface IndexFunction<V, R> {

private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class);

@PositiveOrZero
private int maxIndexesOpen;

@PositiveOrZero
private int commitIntervalSeconds;

@PositiveOrZero
private int idleSeconds;

private Path rootDir;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.apache.couchdb.nouveau.core;

import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response.Status;

public final class StaleIndexException extends WebApplicationException {

public StaleIndexException(final boolean purge, final long minSeq, final long actualSeq) {
super(
String.format(
"index is stale (%s seq needs to be at least %d but is %d)",
purge ? "purge" : "index", minSeq, actualSeq),
Status.CONFLICT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public class IndexHealthCheckTest {
@Test
public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception {
var manager = new IndexManager();
manager.setCommitIntervalSeconds(1);
manager.setObjectMapper(new ObjectMapper());
manager.setCommitIntervalSeconds(30);
manager.setIdleSeconds(60);
manager.setMaxIndexesOpen(1);
manager.setMetricRegistry(new MetricRegistry());
manager.setObjectMapper(new ObjectMapper());
manager.setRootDir(tempDir);
manager.setScheduler(Scheduler.systemScheduler());
manager.setSearcherFactory(new SearcherFactory());
Expand Down
4 changes: 4 additions & 0 deletions src/nouveau/src/nouveau_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ search(#index{} = Index, QueryArgs) ->
case Resp of
{ok, "200", _, RespBody} ->
{ok, jiffy:decode(RespBody, [return_maps])};
{ok, "409", _, _} ->
%% Index was not current enough.
{error, stale_index};
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
Expand All @@ -186,6 +189,7 @@ set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) ->
update_seq => UpdateSeq
},
set_seq(ConnPid, Index, ReqBody).

set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) ->
ReqBody = #{
match_purge_seq => MatchSeq,
Expand Down
2 changes: 1 addition & 1 deletion src/nouveau/src/nouveau_index_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-include("nouveau.hrl").

%% public api
-export([outdated/1]).
-export([outdated/1, get_db_info/1]).

%% callbacks
-export([update/1]).
Expand Down
21 changes: 18 additions & 3 deletions src/nouveau/src/nouveau_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@
-include("nouveau.hrl").
-import(nouveau_util, [index_path/1]).

search(DbName, #index{} = Index0, QueryArgs) ->
search(DbName, #index{} = Index0, QueryArgs0) ->
%% Incorporate the shard name into the record.
Index1 = Index0#index{dbname = DbName},
Update = maps:get(update, QueryArgs, true),

%% get minimum seqs for search
{MinUpdateSeq, MinPurgeSeq} = nouveau_index_updater:get_db_info(Index1),

%% Incorporate min seqs into the query args.
QueryArgs1 = QueryArgs0#{
min_update_seq => MinUpdateSeq,
min_purge_seq => MinPurgeSeq
},
Update = maps:get(update, QueryArgs1, true),

%% check if index is up to date
case Update andalso nouveau_index_updater:outdated(Index1) of
Expand All @@ -45,7 +54,13 @@ search(DbName, #index{} = Index0, QueryArgs) ->
end,

%% Run the search
rexi:reply(nouveau_api:search(Index1, QueryArgs)).
case nouveau_api:search(Index1, QueryArgs1) of
{error, stale_index} ->
%% try again.
search(DbName, Index0, QueryArgs0);
Else ->
rexi:reply(Else)
end.

info(DbName, #index{} = Index0) ->
%% Incorporate the shard name into the record.
Expand Down

0 comments on commit dbf8d02

Please sign in to comment.