Skip to content

Commit

Permalink
[FLINK-11328] [e2e] Do not use deprecated ParameterlessTypeSerializer…
Browse files Browse the repository at this point in the history
…Config in e2e tests
  • Loading branch information
tzulitai committed Jan 23, 2019
1 parent ef47848 commit eb1241e
Showing 1 changed file with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.flink.streaming.tests.artificialstate;

import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -143,21 +141,8 @@ public int hashCode() {
}

@Override
public TypeSerializerConfigSnapshot<ComplexPayload> snapshotConfiguration() {
// type serializer singletons should always be parameter-less
return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
}

@Override
public CompatibilityResult<ComplexPayload> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof ParameterlessTypeSerializerConfig
&& isCompatibleSerializationFormatIdentifier(
((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier())) {

return CompatibilityResult.compatible();
} else {
return CompatibilityResult.requiresMigration();
}
public Snapshot snapshotConfiguration() {
return new Snapshot();
}

private boolean isCompatibleSerializationFormatIdentifier(String identifier) {
Expand All @@ -167,4 +152,15 @@ private boolean isCompatibleSerializationFormatIdentifier(String identifier) {
private String getSerializationFormatIdentifier() {
return getClass().getCanonicalName();
}

// ----------------------------------------------------------------------------------------

/**
* Snapshot for the {@link StatefulComplexPayloadSerializer}.
*/
public static class Snapshot extends SimpleTypeSerializerSnapshot<ComplexPayload> {
public Snapshot() {
super(StatefulComplexPayloadSerializer::new);
}
}
}

0 comments on commit eb1241e

Please sign in to comment.