Skip to content

Commit

Permalink
[FLINK-12688] [state] Make serializer lazy initialization thread safe…
Browse files Browse the repository at this point in the history
… in StateDescriptor

This closes apache#8570.
  • Loading branch information
carp84 authored and tzulitai committed Jun 1, 2019
1 parent 323531a commit b48f17b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand All @@ -28,6 +29,9 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -37,6 +41,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -52,6 +57,7 @@
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StateDescriptor.class);

/**
* An enumeration of the types of supported states. Used to identify the state type
Expand Down Expand Up @@ -82,8 +88,7 @@ public enum Type {
/** The serializer for the type. May be eagerly initialized in the constructor,
* or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
* is called. */
@Nullable
protected TypeSerializer<T> serializer;
private final AtomicReference<TypeSerializer<T>> serializerAtomicReference = new AtomicReference<>();

/** The type information describing the value type. Only used to if the serializer
* is created lazily. */
Expand Down Expand Up @@ -114,7 +119,7 @@ public enum Type {
*/
protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) {
this.name = checkNotNull(name, "name must not be null");
this.serializer = checkNotNull(serializer, "serializer must not be null");
this.serializerAtomicReference.set(checkNotNull(serializer, "serializer must not be null"));
this.defaultValue = defaultValue;
}

Expand Down Expand Up @@ -175,6 +180,7 @@ public String getName() {
*/
public T getDefaultValue() {
if (defaultValue != null) {
TypeSerializer<T> serializer = serializerAtomicReference.get();
if (serializer != null) {
return serializer.copy(defaultValue);
} else {
Expand All @@ -191,13 +197,24 @@ public T getDefaultValue() {
* calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*/
public TypeSerializer<T> getSerializer() {
TypeSerializer<T> serializer = serializerAtomicReference.get();
if (serializer != null) {
return serializer.duplicate();
} else {
throw new IllegalStateException("Serializer not yet initialized.");
}
}

@VisibleForTesting
final TypeSerializer<T> getOriginalSerializer() {
TypeSerializer<T> serializer = serializerAtomicReference.get();
if (serializer != null) {
return serializer;
} else {
throw new IllegalStateException("Serializer not yet initialized.");
}
}

/**
* Sets the name for queries of state created from this descriptor.
*
Expand Down Expand Up @@ -272,7 +289,7 @@ public StateTtlConfig getTtlConfig() {
* @return True if the serializers have been initialized, false otherwise.
*/
public boolean isSerializerInitialized() {
return serializer != null;
return serializerAtomicReference.get() != null;
}

/**
Expand All @@ -281,14 +298,14 @@ public boolean isSerializerInitialized() {
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializer == null) {
if (serializerAtomicReference.get() == null) {
checkState(typeInfo != null, "no serializer and no type info");

// instantiate the serializer
serializer = typeInfo.createSerializer(executionConfig);

// we can drop the type info now, no longer needed
typeInfo = null;
// try to instantiate and set the serializer
TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
// use cas to assure the singleton
if (!serializerAtomicReference.compareAndSet(null, serializer)) {
LOG.debug("Someone else beat us at initializing the serializer.");
}
}
}

Expand Down Expand Up @@ -320,7 +337,7 @@ public String toString() {
return getClass().getSimpleName() +
"{name=" + name +
", defaultValue=" + defaultValue +
", serializer=" + serializer +
", serializer=" + serializerAtomicReference.get() +
(isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") +
'}';
}
Expand All @@ -340,6 +357,9 @@ private void writeObject(final ObjectOutputStream out) throws IOException {
// we don't have a default value
out.writeBoolean(false);
} else {
TypeSerializer<T> serializer = serializerAtomicReference.get();
checkNotNull(serializer, "Serializer not initialized.");

// we have a default value
out.writeBoolean(true);

Expand Down Expand Up @@ -370,6 +390,9 @@ private void readObject(final ObjectInputStream in) throws IOException, ClassNot
// read the default value field
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
TypeSerializer<T> serializer = serializerAtomicReference.get();
checkNotNull(serializer, "Serializer not initialized.");

int size = in.readInt();

byte[] buffer = new byte[size];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -200,6 +204,34 @@ public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
assertNotEquals(descr1, descr2);
}

@Test
public void testSerializerLazyInitializeInParallel() throws Exception {
final String name = "testSerializerLazyInitializeInParallel";
// use PojoTypeInfo which will create a new serializer when createSerializer is invoked.
final TestStateDescriptor<String> desc =
new TestStateDescriptor<>(name, new PojoTypeInfo<>(String.class, new ArrayList<>()));
final int threadNumber = 20;
final ArrayList<CheckedThread> threads = new ArrayList<>(threadNumber);
final ExecutionConfig executionConfig = new ExecutionConfig();
final ConcurrentHashMap<Integer, TypeSerializer<String>> serializers = new ConcurrentHashMap<>();
for (int i = 0; i < threadNumber; i++) {
threads.add(new CheckedThread() {
@Override
public void go() {
desc.initializeSerializerUnlessSet(executionConfig);
TypeSerializer<String> serializer = desc.getOriginalSerializer();
serializers.put(System.identityHashCode(serializer), serializer);
}
});
}
threads.forEach(Thread::start);
for (CheckedThread t : threads) {
t.sync();
}
assertEquals("Should use only one serializer but actually: " + serializers, 1, serializers.size());
threads.clear();
}

// ------------------------------------------------------------------------
// Mock implementations and test types
// ------------------------------------------------------------------------
Expand Down

0 comments on commit b48f17b

Please sign in to comment.