Skip to content

Commit

Permalink
[FLINK-10816][cep] Fix LockableTypeSerializer.duplicate() to consider…
Browse files Browse the repository at this point in the history
… deep duplication of element serializer

This closes apache#7049.
  • Loading branch information
StefanRRichter committed Nov 8, 2018
1 parent ad17588 commit 997fa55
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -109,8 +110,10 @@ public boolean isImmutableType() {
}

@Override
public TypeSerializer<Lockable<E>> duplicate() {
return new LockableTypeSerializer<>(elementSerializer);
public LockableTypeSerializer<E> duplicate() {
TypeSerializer<E> elementSerializerCopy = elementSerializer.duplicate();
return elementSerializerCopy == elementSerializer ?
this : new LockableTypeSerializer<>(elementSerializerCopy);
}

@Override
Expand All @@ -120,7 +123,7 @@ public Lockable<E> createInstance() {

@Override
public Lockable<E> copy(Lockable<E> from) {
return new Lockable<E>(elementSerializer.copy(from.element), from.refCounter);
return new Lockable<>(elementSerializer.copy(from.element), from.refCounter);
}

@Override
Expand Down Expand Up @@ -219,5 +222,10 @@ public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfig
: CompatibilityResult.compatible();
}
}

@VisibleForTesting
TypeSerializer<E> getElementSerializer() {
return elementSerializer;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.heap.TestDuplicateSerializer;

import org.junit.Assert;
import org.junit.Test;

/**
* Tests for {@link org.apache.flink.cep.nfa.sharedbuffer.Lockable.LockableTypeSerializer}.
*/
public class LockableTypeSerializerTest {

/**
* This tests that {@link Lockable.LockableTypeSerializer#duplicate()} works as expected.
*/
@Test
public void testDuplicate() {
IntSerializer nonDuplicatingInnerSerializer = IntSerializer.INSTANCE;
Assert.assertSame(nonDuplicatingInnerSerializer, nonDuplicatingInnerSerializer.duplicate());
Lockable.LockableTypeSerializer<Integer> candidateTestShallowDuplicate =
new Lockable.LockableTypeSerializer<>(nonDuplicatingInnerSerializer);
Assert.assertSame(candidateTestShallowDuplicate, candidateTestShallowDuplicate.duplicate());

TestDuplicateSerializer duplicatingInnerSerializer = new TestDuplicateSerializer();
Assert.assertNotSame(duplicatingInnerSerializer, duplicatingInnerSerializer.duplicate());

Lockable.LockableTypeSerializer<Integer> candidateTestDeepDuplicate =
new Lockable.LockableTypeSerializer<>(duplicatingInnerSerializer);

Lockable.LockableTypeSerializer<Integer> deepDuplicate = candidateTestDeepDuplicate.duplicate();
Assert.assertNotSame(candidateTestDeepDuplicate, deepDuplicate);
Assert.assertNotSame(candidateTestDeepDuplicate.getElementSerializer(), deepDuplicate.getElementSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -564,103 +560,4 @@ public TypeSerializer<T> getKeySerializer() {
}
}

/**
* Serializer that can be disabled. Duplicates are still enabled, so we can check that
* serializers are duplicated.
*/
static class TestDuplicateSerializer extends TypeSerializer<Integer> {

private static final long serialVersionUID = 1L;

private static final Integer ZERO = 0;

private boolean disabled;

public TestDuplicateSerializer() {
this.disabled = false;
}

@Override
public boolean isImmutableType() {
return true;
}

@Override
public TypeSerializer<Integer> duplicate() {
return new TestDuplicateSerializer();
}

@Override
public Integer createInstance() {
return ZERO;
}

@Override
public Integer copy(Integer from) {
return from;
}

@Override
public Integer copy(Integer from, Integer reuse) {
return from;
}

@Override
public int getLength() {
return 4;
}

@Override
public void serialize(Integer record, DataOutputView target) throws IOException {
Assert.assertFalse(disabled);
target.writeInt(record);
}

@Override
public Integer deserialize(DataInputView source) throws IOException {
Assert.assertFalse(disabled);
return source.readInt();
}

@Override
public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
Assert.assertFalse(disabled);
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
Assert.assertFalse(disabled);
target.writeInt(source.readInt());
}

@Override
public boolean equals(Object obj) {
return obj instanceof TestDuplicateSerializer;
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof TestDuplicateSerializer;
}

@Override
public int hashCode() {
return getClass().hashCode();
}

public void disable() {
this.disabled = true;
}

@Override
public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
throw new UnsupportedOperationException();
}

@Override
public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import org.junit.Assert;

import java.io.IOException;

/**
* This serializer can be used to test serializer duplication. The serializer that can be disabled. Duplicates are still
* enabled, so we can check that serializers are duplicated.
*/
public class TestDuplicateSerializer extends TypeSerializer<Integer> {

private static final long serialVersionUID = 1L;

private static final Integer ZERO = 0;

private boolean disabled;

public TestDuplicateSerializer() {
this.disabled = false;
}

@Override
public boolean isImmutableType() {
return true;
}

@Override
public TypeSerializer<Integer> duplicate() {
return new TestDuplicateSerializer();
}

@Override
public Integer createInstance() {
return ZERO;
}

@Override
public Integer copy(Integer from) {
return from;
}

@Override
public Integer copy(Integer from, Integer reuse) {
return from;
}

@Override
public int getLength() {
return 4;
}

@Override
public void serialize(Integer record, DataOutputView target) throws IOException {
Assert.assertFalse(disabled);
target.writeInt(record);
}

@Override
public Integer deserialize(DataInputView source) throws IOException {
Assert.assertFalse(disabled);
return source.readInt();
}

@Override
public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
Assert.assertFalse(disabled);
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
Assert.assertFalse(disabled);
target.writeInt(source.readInt());
}

@Override
public boolean equals(Object obj) {
return obj instanceof TestDuplicateSerializer;
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof TestDuplicateSerializer;
}

@Override
public int hashCode() {
return getClass().hashCode();
}

public void disable() {
this.disabled = true;
}

@Override
public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() {
throw new UnsupportedOperationException();
}

@Override
public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
throw new UnsupportedOperationException();
}
}

0 comments on commit 997fa55

Please sign in to comment.