Skip to content

Commit

Permalink
SERVER-57952 Re-add DonorStateEnum::kPreparingToBlockWrites.
Browse files Browse the repository at this point in the history
(cherry picked from commit bd1a5b7)
  • Loading branch information
visemet committed Jun 24, 2021
1 parent 3e91153 commit 6b14a42
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
ReshardingDonorDocument>(
opCtx, reshardingFields.getReshardingUUID())) {
donorStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields);

const auto coordinatorState = reshardingFields.getState();
if (coordinatorState == CoordinatorStateEnum::kBlockingWrites) {
(*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx);
} else if (coordinatorState == CoordinatorStateEnum::kCommitting) {
(*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx);
(*donorStateMachine)->awaitCriticalSectionPromoted().wait(opCtx);
}

return;
}

Expand Down
27 changes: 15 additions & 12 deletions src/mongo/db/s/resharding/resharding_donor_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockin
executor, abortToken);
})
.then([this, executor, abortToken] {
return _awaitAllRecipientsDoneApplying(executor, abortToken);
return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
executor, abortToken);
})
.then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); });
})
Expand Down Expand Up @@ -482,14 +483,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
}
}

if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) {
_critSecWasAcquired.getFuture().wait(opCtx);
}
SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionAcquired() {
return _critSecWasAcquired.getFuture();
}

if (coordinatorState >= CoordinatorStateEnum::kCommitting) {
_critSecWasPromoted.getFuture().wait(opCtx);
}
SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionPromoted() {
return _critSecWasPromoted.getFuture();
}

void ReshardingDonorService::DonorStateMachine::
Expand Down Expand Up @@ -578,20 +579,22 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
});
}

ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
return ExecutorFuture<void>(**executor, Status::OK());
}

return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken)
.thenRunOn(**executor);
.thenRunOn(**executor)
.then([this] { _transitionState(DonorStateEnum::kPreparingToBlockWrites); });
}

void ReshardingDonorService::DonorStateMachine::
_writeTransactionOplogEntryThenTransitionToBlockingWrites() {
if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) {
stdx::lock_guard<Latch> lk(_mutex);
ensureFulfilledPromise(lk, _critSecWasAcquired);
return;
Expand Down
6 changes: 5 additions & 1 deletion src/mongo/db/s/resharding/resharding_donor_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class ReshardingDonorService::DonorStateMachine final
void onReshardingFieldsChanges(OperationContext* opCtx,
const TypeCollectionReshardingFields& reshardingFields);

SharedSemiFuture<void> awaitCriticalSectionAcquired();

SharedSemiFuture<void> awaitCriticalSectionPromoted();

SharedSemiFuture<void> awaitFinalOplogEntriesWritten();

/**
Expand Down Expand Up @@ -154,7 +158,7 @@ class ReshardingDonorService::DonorStateMachine final
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken);

ExecutorFuture<void> _awaitAllRecipientsDoneApplying(
ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken);

Expand Down
46 changes: 43 additions & 3 deletions src/mongo/db/s/resharding/resharding_donor_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "mongo/platform/basic.h"

#include <boost/optional/optional_io.hpp>
#include <utility>

#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
Expand Down Expand Up @@ -176,13 +177,21 @@ class ReshardingDonorServiceTest : public repl::PrimaryOnlyServiceMongoDTest {
void notifyToStartBlockingWrites(OperationContext* opCtx,
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
notifyToStartBlockingWritesNoWait(opCtx, donor, donorDoc);
ASSERT_OK(donor.awaitCriticalSectionAcquired().waitNoThrow(opCtx));
}

void notifyToStartBlockingWritesNoWait(OperationContext* opCtx,
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
_onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kBlockingWrites);
}

void notifyReshardingCommitting(OperationContext* opCtx,
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
_onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kCommitting);
ASSERT_OK(donor.awaitCriticalSectionPromoted().waitNoThrow(opCtx));
}

void checkStateDocumentRemoved(OperationContext* opCtx) {
Expand Down Expand Up @@ -318,9 +327,18 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
const std::vector<DonorStateEnum> donorStates{DonorStateEnum::kDonatingInitialData,
DonorStateEnum::kDonatingOplogEntries,
DonorStateEnum::kPreparingToBlockWrites,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kDone};

const std::vector<std::pair<DonorStateEnum, bool>> donorStateTransitions{
{DonorStateEnum::kDonatingInitialData, false},
{DonorStateEnum::kDonatingOplogEntries, false},
{DonorStateEnum::kPreparingToBlockWrites, false},
{DonorStateEnum::kBlockingWrites, false},
{DonorStateEnum::kBlockingWrites, true},
{DonorStateEnum::kDone, true}};

for (bool isAlsoRecipient : {false, true}) {
LOGV2(5641801,
"Running case",
Expand All @@ -335,7 +353,10 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
auto opCtx = makeOperationContext();

auto prevState = DonorStateEnum::kUnused;
for (const auto state : donorStates) {
for (const auto& [state, critSecHeld] : donorStateTransitions) {
// The kBlockingWrite state is interrupted twice so we don't unset the guard until after
// the second time.
bool shouldUnsetPrevState = !(state == DonorStateEnum::kBlockingWrites && critSecHeld);
auto donor = [&] {
if (prevState == DonorStateEnum::kUnused) {
createSourceCollection(opCtx.get(), doc);
Expand All @@ -351,7 +372,9 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {

// Allow the transition to prevState to succeed on this primary-only service
// instance.
stateTransitionsGuard.unset(prevState);
if (shouldUnsetPrevState) {
stateTransitionsGuard.unset(prevState);
}
return *maybeDonor;
}
}();
Expand All @@ -363,8 +386,25 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
notifyRecipientsDoneCloning(opCtx.get(), *donor, doc);
break;
}
case DonorStateEnum::kPreparingToBlockWrites: {
notifyToStartBlockingWritesNoWait(opCtx.get(), *donor, doc);
break;
}
case DonorStateEnum::kBlockingWrites: {
notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
// A shard version refresh cannot be triggered once the critical section has
// been acquired. We intentionally test the DonorStateEnum::kBlockingWrites
// transition being triggered two different ways:
//
// - The first transition would wait for the RecoverRefreshThread to
// notify the donor about the CoordinatorStateEnum::kBlockingWrites state.
//
// - The second transition would rely on the donor having already written down
// DonorStateEnum::kPreparingToBlockWrites as a result of the
// RecoverRefreshThread having already been notified the donor about the
// CoordinatorStateEnum::kBlockingWrites state before.
if (!critSecHeld) {
notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
}
break;
}
case DonorStateEnum::kDone: {
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/s/resharding/resharding_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
return;

invariant(checkState(*_currentOp->donorState,
{DonorStateEnum::kDonatingOplogEntries,
{DonorStateEnum::kPreparingToBlockWrites,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kError}));

Expand Down
4 changes: 2 additions & 2 deletions src/mongo/db/s/resharding/resharding_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {

// Update metrics for donor
const auto kWritesDuringCriticalSection = 7;
getMetrics()->setDonorState(DonorStateEnum::kDonatingOplogEntries);
getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites);
getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection);
advanceTime(Seconds(elapsedTime));
Expand Down Expand Up @@ -436,7 +436,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
}

TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
const auto kDonorState = DonorStateEnum::kDonatingOplogEntries;
const auto kDonorState = DonorStateEnum::kPreparingToBlockWrites;
startOperation(ReshardingMetrics::Role::kDonor);
advanceTime(Seconds(2));
getMetrics()->setDonorState(kDonorState);
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ template <class StateEnum>
void StateTransitionController<StateEnum>::_notifyNewStateAndWaitUntilUnpaused(
OperationContext* opCtx, StateEnum newState) {
stdx::unique_lock lk(_mutex);
auto guard = makeGuard([this, prevState = _state] { _state = prevState; });
_state = newState;
_waitUntilUnpausedCond.notify_all();
opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] {
return _pauseDuringTransition.count(newState) == 0;
});
guard.dismiss();
}

template <class StateEnum>
Expand Down
1 change: 1 addition & 0 deletions src/mongo/s/resharding/common_types.idl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enums:
kPreparingToDonate: "preparing-to-donate"
kDonatingInitialData: "donating-initial-data"
kDonatingOplogEntries: "donating-oplog-entries"
kPreparingToBlockWrites: "preparing-to-block-writes"
kError: "error"
kBlockingWrites: "blocking-writes"
kDone: "done"
Expand Down

0 comments on commit 6b14a42

Please sign in to comment.