Skip to content

Commit

Permalink
[hotfix] [tests] Remove unfruitful MigrationTestUtil class
Browse files Browse the repository at this point in the history
That utility class had a single helper method, restoreFromSnapshot,
which accepts the target snapshot's Flink version. This was useful
before, because the way to restore snapshots was a bit different for
Flink <= 1.1 and newer versions.

Since we now no longer support compatibility for 1.1 versions and
below, this helper method is simply forwarding the restore operation
to the test harness.

This commit refactors this have equivalent behaviour directly in the
AbstractStreamOperatorTestHarness class.
  • Loading branch information
tzulitai committed Jan 17, 2019
1 parent 322e479 commit b30adb0
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.util.OperatingSystem;

Expand Down Expand Up @@ -167,11 +166,9 @@ public void testRestore() throws Exception {
new StreamSink<>(sink), 10, 1, 0);
testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
Expand Down Expand Up @@ -207,11 +206,9 @@ public void testRestoreFromEmptyStateNoPartitions() throws Exception {
testHarness.setup();

// restore state from binary snapshot file
MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
testMigrateVersion);
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -248,11 +245,9 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
testHarness.setup();

// restore state from binary snapshot file
MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
testMigrateVersion);
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -302,11 +297,9 @@ public void testRestore() throws Exception {
testHarness.setup();

// restore state from binary snapshot file
MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -349,11 +342,9 @@ public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() thr

// restore state from binary snapshot file; should fail since discovery is enabled
try {
MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));

fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ private void testRescaling(
testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i);

// initializeState() is always called, null signals that we didn't restore
testHarnesses[i].initializeState(null);
testHarnesses[i].initializeEmptyState();
testHarnesses[i].open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;

import com.amazonaws.services.kinesis.model.SequenceNumberRange;
Expand Down Expand Up @@ -148,9 +147,9 @@ public void testRestoreWithEmptyState() throws Exception {
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

testHarness.setup();
MigrationTestUtil.restoreFromSnapshot(
testHarness,
"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot", testMigrateVersion);
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
testHarness.open();

consumerFunction.run(new TestSourceContext<>());
Expand Down Expand Up @@ -204,9 +203,9 @@ public void testRestore() throws Exception {
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

testHarness.setup();
MigrationTestUtil.restoreFromSnapshot(
testHarness,
"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
testHarness.open();

consumerFunction.run(new TestSourceContext<>());
Expand Down Expand Up @@ -285,9 +284,9 @@ public void testRestoreWithReshardedStream() throws Exception {
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);

testHarness.setup();
MigrationTestUtil.restoreFromSnapshot(
testHarness,
"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
testHarness.open();

consumerFunction.run(new TestSourceContext<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ protected KinesisDataFetcher<String> createFetcher(
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);

testHarness.initializeState(null);
testHarness.initializeEmptyState();
testHarness.open();

ConcurrentLinkedQueue<Watermark> watermarks = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.util.OperatingSystem;

Expand Down Expand Up @@ -176,11 +175,9 @@ public void testReaderRestore() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"reader-migration-test-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"reader-migration-test-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -307,11 +304,9 @@ public void testMonitoringSourceRestore() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;

import org.junit.Ignore;
Expand Down Expand Up @@ -158,10 +157,8 @@ public Integer getKey(Event value) throws Exception {
try {
harness.setup();

MigrationTestUtil.restoreFromSnapshot(
harness,
OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"),
migrateVersion);
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"));

harness.open();

Expand Down Expand Up @@ -320,10 +317,9 @@ public Integer getKey(Event value) throws Exception {
try {
harness.setup();

MigrationTestUtil.restoreFromSnapshot(
harness,
OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"),
migrateVersion);
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"));

harness.open();

Expand Down Expand Up @@ -486,10 +482,9 @@ public Integer getKey(Event value) throws Exception {
try {
harness.setup();

MigrationTestUtil.restoreFromSnapshot(
harness,
OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"),
migrateVersion);
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"));

harness.open();

Expand Down Expand Up @@ -579,10 +574,9 @@ public Integer getKey(Event value) throws Exception {
try {
harness.setup();

MigrationTestUtil.restoreFromSnapshot(
harness,
OperatorSnapshotUtil.getResourceFilename("cep-migration-conditions-flink" + migrateVersion + "-snapshot"),
migrateVersion);
harness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"cep-migration-conditions-flink" + migrateVersion + "-snapshot"));

harness.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -184,11 +183,9 @@ public void testRestoreSessionWindowsWithCountTrigger() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -278,11 +275,9 @@ public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Ex

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -402,11 +397,9 @@ public void testRestoreReducingEventTimeWindows() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -513,11 +506,9 @@ public void testRestoreApplyEventTimeWindows() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -620,11 +611,9 @@ public void testRestoreReducingProcessingTimeWindows() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -719,11 +708,9 @@ public void testRestoreApplyProcessingTimeWindows() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down Expand Up @@ -837,11 +824,9 @@ public void testRestoreKryoSerializedKeysWindows() throws Exception {

testHarness.setup();

MigrationTestUtil.restoreFromSnapshot(
testHarness,
testHarness.initializeState(
OperatorSnapshotUtil.getResourceFilename(
"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"),
testMigrateVersion);
"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"));

testHarness.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ public void initializeState(OperatorSubtaskState operatorStateHandles) throws Ex
initializeState(operatorStateHandles, null);
}

public void initializeState(String operatorStateSnapshotPath) throws Exception {
initializeState(OperatorSnapshotUtil.readStateHandle(operatorStateSnapshotPath));
}

public void initializeEmptyState() throws Exception {
initializeState((OperatorSubtaskState) null);
}

/**
* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState()}.
* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
Expand Down Expand Up @@ -485,7 +493,7 @@ public static OperatorSubtaskState repackageState(OperatorSubtaskState... handle
*/
public void open() throws Exception {
if (!initializeCalled) {
initializeState(null);
initializeEmptyState();
}
operator.open();
}
Expand Down
Loading

0 comments on commit b30adb0

Please sign in to comment.