Skip to content

Commit

Permalink
[FLINK-10789] [cep] LockableTypeSerializerSnapshot should be a TypeSe…
Browse files Browse the repository at this point in the history
…rializerSnapshot

This also adds a migration test for the LockableTypeSerializer.
  • Loading branch information
tzulitai committed Nov 11, 2018
1 parent 091cff3 commit 80db939
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 77 deletions.
8 changes: 8 additions & 0 deletions flink-libraries/flink-cep/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ under the License.

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

Expand Down Expand Up @@ -187,40 +186,29 @@ public boolean canEqual(Object obj) {
}

@Override
public TypeSerializerConfigSnapshot<Lockable<E>> snapshotConfiguration() {
return new LockableSerializerConfigSnapshot<>(elementSerializer);
public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
return new LockableTypeSerializerSnapshot<>(elementSerializer);
}

/**
* This cannot be removed until {@link TypeSerializerConfigSnapshot} is no longer supported.
*/
@Override
public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof LockableSerializerConfigSnapshot) {
@SuppressWarnings("unchecked")
LockableSerializerConfigSnapshot<E> snapshot = (LockableSerializerConfigSnapshot<E>) configSnapshot;

Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> nestedSerializerAndConfig =
snapshot.getSingleNestedSerializerAndConfig();

CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
nestedSerializerAndConfig.f0,
UnloadableDummyTypeSerializer.class,
nestedSerializerAndConfig.f1,
elementSerializer);

return (inputCompatibilityResult.isRequiresMigration())
? CompatibilityResult.requiresMigration()
: CompatibilityResult.compatible();
} else {
// backwards compatibility path
CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
configSnapshot.restoreSerializer(),
UnloadableDummyTypeSerializer.class,
configSnapshot,
elementSerializer);

return (inputCompatibilityResult.isRequiresMigration())
? CompatibilityResult.requiresMigration()
: CompatibilityResult.compatible();
}
// backwards compatibility path
CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
configSnapshot.restoreSerializer(),
UnloadableDummyTypeSerializer.class,
configSnapshot,
elementSerializer);

return (inputCompatibilityResult.isRequiresMigration())
? CompatibilityResult.requiresMigration()
: CompatibilityResult.compatible();
}

TypeSerializer<E> getElementSerializer() {
return elementSerializer;
}

@VisibleForTesting
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
*/
@Internal
public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {

private static final int CURRENT_VERSION = 1;

private CompositeSerializerSnapshot nestedElementSerializerSnapshot;

/**
* Constructor for read instantiation.
*/
public LockableTypeSerializerSnapshot() {}

/**
* Constructor to create the snapshot for writing.
*/
public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
}

@Override
public int getCurrentVersion() {
return CURRENT_VERSION;
}

@Override
public TypeSerializer<Lockable<E>> restoreSerializer() {
return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
}

@Override
public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
checkState(nestedElementSerializerSnapshot != null);

if (newSerializer instanceof Lockable.LockableTypeSerializer) {
Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;

return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
TypeSerializerSchemaCompatibility.compatibleAsIs(),
serializer.getElementSerializer());
}
else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.common.typeutils.base.StringSerializer;

/**
* Migration test for the {@link LockableTypeSerializerSnapshot}.
*/
public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Lockable<String>> {

private static final String DATA = "flink-1.6-lockable-type-serializer-data";
private static final String SNAPSHOT = "flink-1.6-lockable-type-serializer-snapshot";

public LockableTypeSerializerSnapshotMigrationTest() {
super(
TestSpecification.<Lockable<String>>builder(
"1.6-lockable-type-serializer",
Lockable.LockableTypeSerializer.class,
LockableTypeSerializerSnapshot.class)
.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
.withSnapshotDataLocation(SNAPSHOT)
.withTestData(DATA, 10)
);
}
}
Binary file not shown.
Binary file not shown.

0 comments on commit 80db939

Please sign in to comment.