Skip to content

Commit

Permalink
[FLINK-3760] Fix StateDescriptor.readObject
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 14, 2016
1 parent 1a34f21 commit 494212b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
/** The type information describing the value type. Only used to lazily create the serializer
* and dropped during serialization */
private transient TypeInformation<T> typeInfo;

// ------------------------------------------------------------------------

/**
* Create a new {@code StateDescriptor} with the given name and the given type serializer.
*
*
* @param name The name of the {@code StateDescriptor}.
* @param serializer The type serializer for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
Expand Down Expand Up @@ -94,7 +94,7 @@ protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultVal

/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
Expand All @@ -106,7 +106,7 @@ protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultVal
protected StateDescriptor(String name, Class<T> type, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
requireNonNull(type, "type class must not be null");

try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
Expand All @@ -117,7 +117,7 @@ protected StateDescriptor(String name, Class<T> type, T defaultValue) {
}

// ------------------------------------------------------------------------

/**
* Returns the name of this {@code StateDescriptor}.
*/
Expand Down Expand Up @@ -152,21 +152,21 @@ public TypeSerializer<T> getSerializer() {
throw new IllegalStateException("Serializer not yet initialized.");
}
}

/**
* Creates a new {@link State} on the given {@link StateBackend}.
*
* @param stateBackend The {@code StateBackend} on which to create the {@link State}.
*/
public abstract S bind(StateBackend stateBackend) throws Exception;

// ------------------------------------------------------------------------

/**
* Checks whether the serializer has been initialized. Serializer initialization is lazy,
* to allow parametrization of serializers with an {@link ExecutionConfig} via
* {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
*
*
* @return True if the serializers have been initialized, false otherwise.
*/
public boolean isSerializerInitialized() {
Expand All @@ -175,7 +175,7 @@ public boolean isSerializerInitialized() {

/**
* Initializes the serializer, unless it has been initialized before.
*
*
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
Expand All @@ -188,7 +188,7 @@ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
}
}
}

/**
* This method should be called by subclasses prior to serialization. Because the TypeInformation is
* not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
Expand All @@ -204,7 +204,7 @@ private void ensureSerializerCreated() {
}
}
}

// ------------------------------------------------------------------------
// Standard Utils
// ------------------------------------------------------------------------
Expand All @@ -230,7 +230,7 @@ else if (o == null || getClass() != o.getClass()) {

@Override
public String toString() {
return getClass().getSimpleName() +
return getClass().getSimpleName() +
"{name=" + name +
", defaultValue=" + defaultValue +
", serializer=" + serializer +
Expand All @@ -257,7 +257,7 @@ private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeBoolean(true);

byte[] serializedDefaultValue;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
{
TypeSerializer<T> duplicateSerializer = serializer.duplicate();
Expand All @@ -284,12 +284,10 @@ private void readObject(final ObjectInputStream in) throws IOException, ClassNot
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
int size = in.readInt();

byte[] buffer = new byte[size];
int bytesRead = in.read(buffer);

if (bytesRead != size) {
throw new RuntimeException("Read size does not match expected size.");
}
in.readFully(buffer);

try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,33 @@ public void testValueStateDescriptorAutoSerializer() throws Exception {
assertNotNull(copy.getSerializer());
assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
}

@Test
public void testVeryLargeDefaultValue() throws Exception {
// ensure that we correctly read very large data when deserializing the default value

TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
byte[] data = new byte[200000];
for (int i = 0; i < 200000; i++) {
data[i] = 65;
}
data[199000] = '\0';

String defaultValue = new String(data);

ValueStateDescriptor<String> descr =
new ValueStateDescriptor<String>("testName", serializer, defaultValue);

assertEquals("testName", descr.getName());
assertEquals(defaultValue, descr.getDefaultValue());
assertNotNull(descr.getSerializer());
assertEquals(serializer, descr.getSerializer());

ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);

assertEquals("testName", copy.getName());
assertEquals(defaultValue, copy.getDefaultValue());
assertNotNull(copy.getSerializer());
assertEquals(serializer, copy.getSerializer());
}
}

0 comments on commit 494212b

Please sign in to comment.