diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index 9d4d95996170a..841c0b4e2e851 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -625,12 +625,6 @@ public Optional initMaterialization() throws Exception } } - /** Not thread safe. */ - @VisibleForTesting - boolean hasNonMaterializedChanges() { - return getLastAppendedTo().compareTo(changelogSnapshotState.lastMaterializedTo()) > 0; - } - // TODO: Remove after fix FLINK-24436 // FsStateChangelogWriter#lastAppendedSequenceNumber returns different seq number // the first time called vs called after the first time. diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java index 3279cc2e0343f..1cbdfd25317ad 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java @@ -17,6 +17,7 @@ package org.apache.flink.state.changelog; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -117,6 +118,7 @@ public synchronized void close() { } } + @VisibleForTesting public void triggerMaterialization() { mailboxExecutor.execute( () -> { @@ -238,12 +240,13 @@ private void discardFailedUploads( // task thread and asyncOperationsThreadPool can access this method private synchronized void scheduleNextMaterialization() { - LOG.info( - "Task {} schedules the next materialization in {} seconds", - subtaskName, - periodicMaterializeDelay / 1000); + if (started && !periodicExecutor.isShutdown()) { + + LOG.info( + "Task {} schedules the next materialization in {} seconds", + subtaskName, + periodicMaterializeDelay / 1000); - if (!periodicExecutor.isShutdown()) { periodicExecutor.schedule( this::triggerMaterialization, periodicMaterializeDelay, TimeUnit.MILLISECONDS); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java index 481fceef7b8b1..46bf563cc13f1 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -33,6 +32,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.concurrent.Executors; import javax.annotation.Nullable; @@ -59,7 +60,6 @@ import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.state.StateBackendTestBase.runSnapshot; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; /** Test Utilities for Changelog StateBackend. */ @@ -144,16 +144,14 @@ public static void testMaterializedRestore( CompletableFuture asyncComplete = new CompletableFuture<>(); PeriodicMaterializationManager periodicMaterializationManager = new PeriodicMaterializationManager( - checkNotNull(env.getMainMailboxExecutor()), - checkNotNull(env.getAsyncOperationsThreadPool()), + new SyncMailboxExecutor(), + Executors.newDirectExecutorService(), env.getTaskInfo().getTaskNameWithSubtasks(), (message, exception) -> asyncComplete.completeExceptionally(exception), keyedBackend, 10, 1); - periodicMaterializationManager.start(); - try { ValueState state = keyedBackend.getPartitionedState( @@ -165,7 +163,11 @@ public static void testMaterializedRestore( keyedBackend.setCurrentKey(2); state.update(new StateBackendTestBase.TestPojo("u2", 2)); - awaitMaterialization(keyedBackend, env.getMainMailboxExecutor()); + // In this test, every materialization is triggered explicitly by calling + // triggerMaterialization. + // Automatic/periodic triggering is disabled by NOT starting the + // periodicMaterializationManager + periodicMaterializationManager.triggerMaterialization(); keyedBackend.setCurrentKey(2); state.update(new StateBackendTestBase.TestPojo("u2", 22)); @@ -173,6 +175,14 @@ public static void testMaterializedRestore( keyedBackend.setCurrentKey(3); state.update(new StateBackendTestBase.TestPojo("u3", 3)); + periodicMaterializationManager.triggerMaterialization(); + + keyedBackend.setCurrentKey(4); + state.update(new StateBackendTestBase.TestPojo("u4", 4)); + + keyedBackend.setCurrentKey(2); + state.update(new StateBackendTestBase.TestPojo("u2", 222)); + KeyedStateHandle snapshot = runSnapshot( keyedBackend.snapshot( @@ -182,7 +192,6 @@ public static void testMaterializedRestore( CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry); - periodicMaterializationManager.close(); IOUtils.closeQuietly(keyedBackend); keyedBackend.dispose(); @@ -205,29 +214,19 @@ public static void testMaterializedRestore( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); keyedBackend.setCurrentKey(1); - assertEquals(state.value(), new StateBackendTestBase.TestPojo("u1", 1)); + assertEquals(new StateBackendTestBase.TestPojo("u1", 1), state.value()); keyedBackend.setCurrentKey(2); - assertEquals(state.value(), new StateBackendTestBase.TestPojo("u2", 22)); + assertEquals(new StateBackendTestBase.TestPojo("u2", 222), state.value()); keyedBackend.setCurrentKey(3); - assertEquals(state.value(), new StateBackendTestBase.TestPojo("u3", 3)); + assertEquals(new StateBackendTestBase.TestPojo("u3", 3), state.value()); } finally { IOUtils.closeQuietly(keyedBackend); keyedBackend.dispose(); } } - private static void awaitMaterialization( - ChangelogKeyedStateBackend keyedStateBackend, MailboxExecutor mailboxExecutor) - throws Exception { - while (mailboxExecutor - .submit(keyedStateBackend::hasNonMaterializedChanges, "for test") - .get()) { - Thread.sleep(10); - } - } - static class DummyCheckpointingStorageAccess implements CheckpointStorageAccess { DummyCheckpointingStorageAccess() {}