Skip to content

Commit

Permalink
[hotfix] Change ChangelogStateBackendTestUtils#testMaterializedRestor…
Browse files Browse the repository at this point in the history
…e to single thread
  • Loading branch information
curcur committed Dec 9, 2021
1 parent c8938b4 commit 792d922
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,6 @@ public Optional<MaterializationRunnable> initMaterialization() throws Exception
}
}

/** Not thread safe. */
@VisibleForTesting
boolean hasNonMaterializedChanges() {
return getLastAppendedTo().compareTo(changelogSnapshotState.lastMaterializedTo()) > 0;
}

// TODO: Remove after fix FLINK-24436
// FsStateChangelogWriter#lastAppendedSequenceNumber returns different seq number
// the first time called vs called after the first time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.state.changelog;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.state.KeyedStateHandle;
Expand Down Expand Up @@ -117,6 +118,7 @@ public synchronized void close() {
}
}

@VisibleForTesting
public void triggerMaterialization() {
mailboxExecutor.execute(
() -> {
Expand Down Expand Up @@ -238,12 +240,13 @@ private void discardFailedUploads(

// task thread and asyncOperationsThreadPool can access this method
private synchronized void scheduleNextMaterialization() {
LOG.info(
"Task {} schedules the next materialization in {} seconds",
subtaskName,
periodicMaterializeDelay / 1000);
if (started && !periodicExecutor.isShutdown()) {

LOG.info(
"Task {} schedules the next materialization in {} seconds",
subtaskName,
periodicMaterializeDelay / 1000);

if (!periodicExecutor.isShutdown()) {
periodicExecutor.schedule(
this::triggerMaterialization, periodicMaterializeDelay, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.state.changelog;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -33,6 +32,7 @@
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
Expand All @@ -50,6 +50,7 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.Executors;

import javax.annotation.Nullable;

Expand All @@ -59,7 +60,6 @@
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.runtime.state.StateBackendTestBase.runSnapshot;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;

/** Test Utilities for Changelog StateBackend. */
Expand Down Expand Up @@ -144,16 +144,14 @@ public static void testMaterializedRestore(
CompletableFuture<Void> asyncComplete = new CompletableFuture<>();
PeriodicMaterializationManager periodicMaterializationManager =
new PeriodicMaterializationManager(
checkNotNull(env.getMainMailboxExecutor()),
checkNotNull(env.getAsyncOperationsThreadPool()),
new SyncMailboxExecutor(),
Executors.newDirectExecutorService(),
env.getTaskInfo().getTaskNameWithSubtasks(),
(message, exception) -> asyncComplete.completeExceptionally(exception),
keyedBackend,
10,
1);

periodicMaterializationManager.start();

try {
ValueState<StateBackendTestBase.TestPojo> state =
keyedBackend.getPartitionedState(
Expand All @@ -165,14 +163,26 @@ public static void testMaterializedRestore(
keyedBackend.setCurrentKey(2);
state.update(new StateBackendTestBase.TestPojo("u2", 2));

awaitMaterialization(keyedBackend, env.getMainMailboxExecutor());
// In this test, every materialization is triggered explicitly by calling
// triggerMaterialization.
// Automatic/periodic triggering is disabled by NOT starting the
// periodicMaterializationManager
periodicMaterializationManager.triggerMaterialization();

keyedBackend.setCurrentKey(2);
state.update(new StateBackendTestBase.TestPojo("u2", 22));

keyedBackend.setCurrentKey(3);
state.update(new StateBackendTestBase.TestPojo("u3", 3));

periodicMaterializationManager.triggerMaterialization();

keyedBackend.setCurrentKey(4);
state.update(new StateBackendTestBase.TestPojo("u4", 4));

keyedBackend.setCurrentKey(2);
state.update(new StateBackendTestBase.TestPojo("u2", 222));

KeyedStateHandle snapshot =
runSnapshot(
keyedBackend.snapshot(
Expand All @@ -182,7 +192,6 @@ public static void testMaterializedRestore(
CheckpointOptions.forCheckpointWithDefaultLocation()),
sharedStateRegistry);

periodicMaterializationManager.close();
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();

Expand All @@ -205,29 +214,19 @@ public static void testMaterializedRestore(
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

keyedBackend.setCurrentKey(1);
assertEquals(state.value(), new StateBackendTestBase.TestPojo("u1", 1));
assertEquals(new StateBackendTestBase.TestPojo("u1", 1), state.value());

keyedBackend.setCurrentKey(2);
assertEquals(state.value(), new StateBackendTestBase.TestPojo("u2", 22));
assertEquals(new StateBackendTestBase.TestPojo("u2", 222), state.value());

keyedBackend.setCurrentKey(3);
assertEquals(state.value(), new StateBackendTestBase.TestPojo("u3", 3));
assertEquals(new StateBackendTestBase.TestPojo("u3", 3), state.value());
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
}
}

private static void awaitMaterialization(
ChangelogKeyedStateBackend<Integer> keyedStateBackend, MailboxExecutor mailboxExecutor)
throws Exception {
while (mailboxExecutor
.submit(keyedStateBackend::hasNonMaterializedChanges, "for test")
.get()) {
Thread.sleep(10);
}
}

static class DummyCheckpointingStorageAccess implements CheckpointStorageAccess {

DummyCheckpointingStorageAccess() {}
Expand Down

0 comments on commit 792d922

Please sign in to comment.