Skip to content

Commit

Permalink
[FLINK-20580][core] Does not accept null value for SerializedValue
Browse files Browse the repository at this point in the history
This closes apache#14936.
  • Loading branch information
kezhuw authored and tillrohrmann committed Feb 18, 2021
1 parent bbdd769 commit ed981be
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
42 changes: 26 additions & 16 deletions flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.annotation.Internal;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;

Expand All @@ -43,35 +41,50 @@ public class SerializedValue<T> implements java.io.Serializable {
private static final long serialVersionUID = -3564011643393683761L;

/** The serialized data. */
@Nullable private final byte[] serializedData;
private final byte[] serializedData;

private SerializedValue(byte[] serializedData) {
Preconditions.checkNotNull(serializedData, "Serialized data");
Preconditions.checkNotNull(serializedData, "Serialized data must not be null");
Preconditions.checkArgument(
serializedData.length != 0, "Serialized data must not be empty");
this.serializedData = serializedData;
}

/**
* Constructs a serialized value.
*
* @param value value to serialize
* @throws NullPointerException if value is null
* @throws IOException exception during serialization
*/
public SerializedValue(T value) throws IOException {
this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
Preconditions.checkNotNull(value, "Value must not be null");
this.serializedData = InstantiationUtil.serializeObject(value);
}

@SuppressWarnings("unchecked")
public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
Preconditions.checkNotNull(loader, "No classloader has been passed");
return serializedData == null
? null
: (T) InstantiationUtil.deserializeObject(serializedData, loader);
return InstantiationUtil.deserializeObject(serializedData, loader);
}

/**
* Returns the serialized value or <code>null</code> if no value is set.
* Returns byte array for serialized data.
*
* @return Serialized data.
*/
@Nullable
public byte[] getByteArray() {
return serializedData;
}

/**
* Constructs serialized value from serialized data.
*
* @param serializedData serialized data
* @param <T> type
* @return serialized value
* @throws NullPointerException if serialized data is null
* @throws IllegalArgumentException if serialized data is empty
*/
public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
return new SerializedValue<>(serializedData);
}
Expand All @@ -80,17 +93,14 @@ public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {

@Override
public int hashCode() {
return serializedData == null ? 0 : Arrays.hashCode(serializedData);
return Arrays.hashCode(serializedData);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof SerializedValue) {
SerializedValue<?> other = (SerializedValue<?>) obj;
return this.serializedData == null
? other.serializedData == null
: (other.serializedData != null
&& Arrays.equals(this.serializedData, other.serializedData));
return Arrays.equals(this.serializedData, other.serializedData);
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import org.junit.Test;

import java.util.Arrays;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

/** Tests for the {@link SerializedValue}. */
Expand All @@ -47,26 +50,32 @@ public void testSimpleValue() {
assertNotNull(v.toString());
assertNotNull(copy.toString());

assertNotEquals(0, v.getByteArray().length);
assertArrayEquals(v.getByteArray(), copy.getByteArray());

byte[] bytes = v.getByteArray();
SerializedValue<String> saved =
SerializedValue.fromBytes(Arrays.copyOf(bytes, bytes.length));
assertEquals(v, saved);
assertArrayEquals(v.getByteArray(), saved.getByteArray());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testNullValue() {
try {
SerializedValue<Object> v = new SerializedValue<>(null);
SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
@Test(expected = NullPointerException.class)
public void testNullValue() throws Exception {
new SerializedValue<>(null);
}

assertNull(copy.deserializeValue(getClass().getClassLoader()));
@Test(expected = NullPointerException.class)
public void testFromNullBytes() {
SerializedValue.fromBytes(null);
}

assertEquals(v, copy);
assertEquals(v.hashCode(), copy.hashCode());
assertEquals(v.toString(), copy.toString());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@Test(expected = IllegalArgumentException.class)
public void testFromEmptyBytes() {
SerializedValue.fromBytes(new byte[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
final SerializedValue<T> serializedValue = new SerializedValue<>(value);

if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
return Either.Left(new SerializedValue<>(value));
return Either.Left(serializedValue);
} else {
try {
final PermanentBlobKey permanentBlobKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void eventHandlingInTaskFailureFailsTask() throws Exception {
final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
final CompletableFuture<?> resultFuture =
tmGateway.sendOperatorEventToTask(
eid, new OperatorID(), new SerializedValue<>(null));
eid, new OperatorID(), new SerializedValue<>(new TestOperatorEvent()));

assertThat(
resultFuture,
Expand Down

0 comments on commit ed981be

Please sign in to comment.