From 494212b37b64b45cd777cc09fc6d3d3d8fbf5999 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 14 Apr 2016 16:10:06 +0200 Subject: [PATCH] [FLINK-3760] Fix StateDescriptor.readObject --- .../api/common/state/StateDescriptor.java | 34 +++++++++---------- .../state/ValueStateDescriptorTest.java | 29 ++++++++++++++++ 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 10ac5bae0994d..243ebcd20bb66 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -61,12 +61,12 @@ public abstract class StateDescriptor implements Serializabl /** The type information describing the value type. Only used to lazily create the serializer * and dropped during serialization */ private transient TypeInformation typeInfo; - + // ------------------------------------------------------------------------ - + /** * Create a new {@code StateDescriptor} with the given name and the given type serializer. - * + * * @param name The name of the {@code StateDescriptor}. * @param serializer The type serializer for the values in the state. * @param defaultValue The default value that will be set when requesting state without setting @@ -94,7 +94,7 @@ protected StateDescriptor(String name, TypeInformation typeInfo, T defaultVal /** * Create a new {@code StateDescriptor} with the given name and the given type information. - * + * *

If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. * @@ -106,7 +106,7 @@ protected StateDescriptor(String name, TypeInformation typeInfo, T defaultVal protected StateDescriptor(String name, Class type, T defaultValue) { this.name = requireNonNull(name, "name must not be null"); requireNonNull(type, "type class must not be null"); - + try { this.typeInfo = TypeExtractor.createTypeInfo(type); } catch (Exception e) { @@ -117,7 +117,7 @@ protected StateDescriptor(String name, Class type, T defaultValue) { } // ------------------------------------------------------------------------ - + /** * Returns the name of this {@code StateDescriptor}. */ @@ -152,21 +152,21 @@ public TypeSerializer getSerializer() { throw new IllegalStateException("Serializer not yet initialized."); } } - + /** * Creates a new {@link State} on the given {@link StateBackend}. * * @param stateBackend The {@code StateBackend} on which to create the {@link State}. */ public abstract S bind(StateBackend stateBackend) throws Exception; - + // ------------------------------------------------------------------------ /** * Checks whether the serializer has been initialized. Serializer initialization is lazy, * to allow parametrization of serializers with an {@link ExecutionConfig} via * {@link #initializeSerializerUnlessSet(ExecutionConfig)}. - * + * * @return True if the serializers have been initialized, false otherwise. */ public boolean isSerializerInitialized() { @@ -175,7 +175,7 @@ public boolean isSerializerInitialized() { /** * Initializes the serializer, unless it has been initialized before. - * + * * @param executionConfig The execution config to use when creating the serializer. */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { @@ -188,7 +188,7 @@ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { } } } - + /** * This method should be called by subclasses prior to serialization. Because the TypeInformation is * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor @@ -204,7 +204,7 @@ private void ensureSerializerCreated() { } } } - + // ------------------------------------------------------------------------ // Standard Utils // ------------------------------------------------------------------------ @@ -230,7 +230,7 @@ else if (o == null || getClass() != o.getClass()) { @Override public String toString() { - return getClass().getSimpleName() + + return getClass().getSimpleName() + "{name=" + name + ", defaultValue=" + defaultValue + ", serializer=" + serializer + @@ -257,7 +257,7 @@ private void writeObject(final ObjectOutputStream out) throws IOException { out.writeBoolean(true); byte[] serializedDefaultValue; - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) { TypeSerializer duplicateSerializer = serializer.duplicate(); @@ -284,12 +284,10 @@ private void readObject(final ObjectInputStream in) throws IOException, ClassNot boolean hasDefaultValue = in.readBoolean(); if (hasDefaultValue) { int size = in.readInt(); + byte[] buffer = new byte[size]; - int bytesRead = in.read(buffer); - if (bytesRead != size) { - throw new RuntimeException("Read size does not match expected size."); - } + in.readFully(buffer); try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java index d03cc4733ee46..655ffd50247ff 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -101,4 +101,33 @@ public void testValueStateDescriptorAutoSerializer() throws Exception { assertNotNull(copy.getSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); } + + @Test + public void testVeryLargeDefaultValue() throws Exception { + // ensure that we correctly read very large data when deserializing the default value + + TypeSerializer serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + byte[] data = new byte[200000]; + for (int i = 0; i < 200000; i++) { + data[i] = 65; + } + data[199000] = '\0'; + + String defaultValue = new String(data); + + ValueStateDescriptor descr = + new ValueStateDescriptor("testName", serializer, defaultValue); + + assertEquals("testName", descr.getName()); + assertEquals(defaultValue, descr.getDefaultValue()); + assertNotNull(descr.getSerializer()); + assertEquals(serializer, descr.getSerializer()); + + ValueStateDescriptor copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertEquals(defaultValue, copy.getDefaultValue()); + assertNotNull(copy.getSerializer()); + assertEquals(serializer, copy.getSerializer()); + } }