Skip to content

Commit

Permalink
[FLINK-6803] [tests] Add test for PojoSerializer state upgrade
Browse files Browse the repository at this point in the history
The added PojoSerializerUpgradeTest tests the state migration behaviour when the
underlying pojo type changes and one tries to recover from old state. Currently
not all tests could be activated, because there still some pending issues to be
fixed first. We should arm these tests once the issues have been fixed.
  • Loading branch information
tillrohrmann authored and tzulitai committed Jun 13, 2017
1 parent e35c575 commit 2d34af3
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;

import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -1506,7 +1507,7 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception
*/
@SuppressWarnings("rawtypes, unchecked")
protected <N, S> ColumnFamilyHandle getColumnFamily(
StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {

Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
kvStateInformation.get(descriptor.getName());
Expand Down Expand Up @@ -1557,7 +1558,7 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(
return stateInfo.f0;
} else {
// TODO state migration currently isn't possible.
throw new RuntimeException("State migration currently isn't supported.");
throw new StateMigrationException("State migration isn't supported, yet.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
private ValueState<Integer> testState1;
private ValueState<String> testState2;

@Parameterized.Parameters
@Parameterized.Parameters(name = "Incremental checkpointing: {0}")
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.annotation.Internal;

import javax.annotation.Nullable;

/**
* Utilities related to serializer compatibility.
*/
Expand All @@ -40,7 +42,7 @@ public class CompatibilityUtil {
* If yes, use that for state migration and simply return the result.
* 6. If all of above fails, state migration is required but could not be performed; throw exception.
*
* @param precedingSerializer the preceding serializer used to write the data
* @param precedingSerializer the preceding serializer used to write the data, null if none could be retrieved
* @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
* @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
* @param newSerializer the new serializer to ensure compatibility with
Expand All @@ -51,7 +53,7 @@ public class CompatibilityUtil {
*/
@SuppressWarnings("unchecked")
public static <T> CompatibilityResult<T> resolveCompatibilityResult(
TypeSerializer<?> precedingSerializer,
@Nullable TypeSerializer<?> precedingSerializer,
Class<?> dummySerializerClassTag,
TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
TypeSerializer<T> newSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Field deserializeField(ObjectInputStream in) throws IOException, C
clazz = clazz.getSuperclass();
}
}
throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+ " (" + fieldName + ")");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* Base class for state migration related exceptions
*/
public class StateMigrationException extends FlinkException {
private static final long serialVersionUID = 8268516412747670839L;

public StateMigrationException(String message) {
super(message);
}

public StateMigrationException(Throwable cause) {
super(cause);
}

public StateMigrationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -399,7 +400,7 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws
.isRequiresMigration()) {

// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " +
throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.flink.types.IntValue;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -1804,53 +1805,44 @@ public void testKeyGroupSnapshotRestore() throws Exception {
}

@Test
public void testRestoreWithWrongKeySerializer() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();
public void testRestoreWithWrongKeySerializer() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();

// use an IntSerializer at first
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
// use an IntSerializer at first
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);

ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

// write some state
backend.setCurrentKey(1);
state.update("1");
backend.setCurrentKey(2);
state.update("2");
// write some state
backend.setCurrentKey(1);
state.update("1");
backend.setCurrentKey(2);
state.update("2");

// draw a snapshot
KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// draw a snapshot
KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));

backend.dispose();
backend.dispose();

// restore with the wrong key serializer
try {
restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
// restore with the wrong key serializer
try {
restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);

fail("should recognize wrong key serializer");
} catch (RuntimeException e) {
if (!e.getMessage().contains("The new key serializer is not compatible")) {
fail("wrong exception " + e);
}
// expected
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
fail("should recognize wrong key serializer");
} catch (StateMigrationException ignored) {
// expected
}
}

@Test
@SuppressWarnings("unchecked")
public void testValueStateRestoreWithWrongSerializers() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
public void testValueStateRestoreWithWrongSerializers() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);

ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
Expand Down Expand Up @@ -1880,29 +1872,21 @@ public void testValueStateRestoreWithWrongSerializers() {
state.value();

fail("should recognize wrong serializers");
} catch (RuntimeException e) {
if (!e.getMessage().contains("State migration currently isn't supported")) {
fail("wrong exception " + e);
}
} catch (StateMigrationException ignored) {
// expected
} catch (Exception e) {
fail("wrong exception " + e);
}
} finally {
backend.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
@SuppressWarnings("unchecked")
public void testListStateRestoreWithWrongSerializers() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
public void testListStateRestoreWithWrongSerializers() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

Expand Down Expand Up @@ -1931,29 +1915,21 @@ public void testListStateRestoreWithWrongSerializers() {
state.get();

fail("should recognize wrong serializers");
} catch (RuntimeException e) {
if (!e.getMessage().contains("State migration currently isn't supported")) {
fail("wrong exception " + e);
}
} catch (StateMigrationException ignored) {
// expected
} catch (Exception e) {
fail("wrong exception " + e);
}
} finally {
backend.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
@SuppressWarnings("unchecked")
public void testReducingStateRestoreWithWrongSerializers() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
public void testReducingStateRestoreWithWrongSerializers() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
new AppendingReduce(),
StringSerializer.INSTANCE);
Expand Down Expand Up @@ -1984,29 +1960,21 @@ public void testReducingStateRestoreWithWrongSerializers() {
state.get();

fail("should recognize wrong serializers");
} catch (RuntimeException e) {
if (!e.getMessage().contains("State migration currently isn't supported")) {
fail("wrong exception " + e);
}
} catch (StateMigrationException ignored) {
// expected
} catch (Exception e) {
fail("wrong exception " + e);
}
} finally {
backend.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
@SuppressWarnings("unchecked")
public void testMapStateRestoreWithWrongSerializers() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
public void testMapStateRestoreWithWrongSerializers() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
MapState<String, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

Expand All @@ -2025,7 +1993,7 @@ public void testMapStateRestoreWithWrongSerializers() {

@SuppressWarnings("unchecked")
TypeSerializer<String> fakeStringSerializer =
(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;

try {
kvId = new MapStateDescriptor<>("id", fakeStringSerializer, StringSerializer.INSTANCE);
Expand All @@ -2035,19 +2003,12 @@ public void testMapStateRestoreWithWrongSerializers() {
state.entries();

fail("should recognize wrong serializers");
} catch (RuntimeException e) {
if (!e.getMessage().contains("State migration currently isn't supported")) {
fail("wrong exception " + e);
}
} catch (StateMigrationException ignored) {
// expected
} catch (Exception e) {
fail("wrong exception " + e);
}
backend.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
backend.dispose();
}
}

Expand Down
1 change: 0 additions & 1 deletion flink-streaming-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
Expand Down
8 changes: 8 additions & 0 deletions flink-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
Expand Down
Loading

0 comments on commit 2d34af3

Please sign in to comment.