Skip to content

Commit

Permalink
[FLINK-11420][datastream] Fix duplicate and createInstance methods of…
Browse files Browse the repository at this point in the history
… CoGroupedStreams.UnionSerializer

UnionSerializer did not perform a proper duplication of inner serializers. It also violated the assumption that createInstance never produces null.
  • Loading branch information
dawidwys committed Mar 11, 2019
1 parent 1ec833a commit 3133a6d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,12 +523,22 @@ public boolean isImmutableType() {

@Override
public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
return this;
TypeSerializer<T1> duplicateOne = oneSerializer.duplicate();
TypeSerializer<T2> duplicateTwo = twoSerializer.duplicate();

// compare reference of nested serializers, if same instances returned, we can reuse
// this instance as well
if (duplicateOne != oneSerializer || duplicateTwo != twoSerializer) {
return new UnionSerializer<>(duplicateOne, duplicateTwo);
} else {
return this;
}
}

@Override
public TaggedUnion<T1, T2> createInstance() {
return null;
//we arbitrarily always create instance of one
return TaggedUnion.one(oneSerializer.createInstance());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
import org.apache.flink.testutils.DeeplyEqualsChecker;

/**
* Serializer tests for {@link UnionSerializer}.
*/
public class UnionSerializerTest extends SerializerTestBase<TaggedUnion<Object, Object>> {

public UnionSerializerTest() {
super(new DeeplyEqualsChecker()
.withCustomCheck(
(o1, o2) -> o1 instanceof TaggedUnion && o2 instanceof TaggedUnion,
(o1, o2, checker) -> {
TaggedUnion union1 = (TaggedUnion) o1;
TaggedUnion union2 = (TaggedUnion) o2;

if (union1.isOne() && union2.isOne()) {
return checker.deepEquals(union1.getOne(), union2.getOne());
} else if (union1.isTwo() && union2.isTwo()) {
return checker.deepEquals(union1.getTwo(), union2.getTwo());
} else {
return false;
}
}
));
}

@Override
protected TypeSerializer<TaggedUnion<Object, Object>> createSerializer() {
return new UnionSerializer<>(
new KryoSerializer<>(Object.class, new ExecutionConfig()),
new KryoSerializer<>(Object.class, new ExecutionConfig())
);
}

@Override
protected int getLength() {
return -1;
}

@Override
@SuppressWarnings("unchecked")
protected Class<TaggedUnion<Object, Object>> getTypeClass() {
return (Class<TaggedUnion<Object, Object>>) (Class<?>) TaggedUnion.class;
}

@Override
@SuppressWarnings("unchecked")
protected TaggedUnion<Object, Object>[] getTestData() {
return new TaggedUnion[]{
TaggedUnion.one(1),
TaggedUnion.two("A"),
TaggedUnion.one("C")
};
}
}

0 comments on commit 3133a6d

Please sign in to comment.