Skip to content

Commit

Permalink
[FLINK-14642] Add support for copying null values to the TupleSeriali…
Browse files Browse the repository at this point in the history
…zer and CaseClassSerializer
  • Loading branch information
jiasheng55 authored and aljoscha committed Nov 14, 2019
1 parent a8868dd commit 84c96b3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public T createOrReuseInstance(Object[] fields, T reuse) {

@Override
public T copy(T from) {
if (from == null) {
return null;
}

T target = instantiateRaw();
for (int i = 0; i < arity; i++) {
Object copy = fieldSerializers[i].copy(from.getField(i));
Expand All @@ -116,6 +120,10 @@ public T copy(T from) {

@Override
public T copy(T from, T reuse) {
if (from == null) {
return null;
}

for (int i = 0; i < arity; i++) {
Object copy = fieldSerializers[i].copy((Object)from.getField(i), reuse.getField(i));
reuse.setField(copy, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,18 @@ abstract class CaseClassSerializer[T <: Product](
}

def copy(from: T): T = {
initArray()
var i = 0
while (i < arity) {
fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
i += 1
if (from == null) {
null.asInstanceOf[T]
}
else {
initArray()
var i = 0
while (i < arity) {
fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
i += 1
}
createInstance(fields)
}
createInstance(fields)
}

def serialize(value: T, target: DataOutputView) {
Expand Down

0 comments on commit 84c96b3

Please sign in to comment.