Skip to content

Commit

Permalink
[FLINK-13369] Track references of already visited object in ClosureCl…
Browse files Browse the repository at this point in the history
…eaner
  • Loading branch information
David Moravek authored and kl0u committed Jul 23, 2019
1 parent 898b190 commit e568639
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;

/**
* The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
Expand Down Expand Up @@ -65,10 +68,18 @@ public class ClosureCleaner {
* be loaded, in order to process during the closure cleaning.
*/
public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>()));
}

private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
if (func == null) {
return;
}

if (!visited.add(func)) {
return;
}

final Class<?> cls = func.getClass();

if (ClassUtils.isPrimitiveOrWrapper(cls)) {
Expand Down Expand Up @@ -112,7 +123,7 @@ public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level,
LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
}

clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.java.functions;

import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -133,6 +134,12 @@ public void testComplexInnerClassClean() throws Exception {
Assert.assertEquals(result, 4);
}

@Test
public void testSelfReferencingClean() {
final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

class InnerCustomMap implements MapFunction<Integer, Integer> {

@Override
Expand Down Expand Up @@ -421,3 +428,21 @@ public MapFunction<Integer, Integer> getMap() {
}
}

@FunctionalInterface
interface SerializableSupplier<T> extends Supplier<T>, Serializable {

}

class NestedSelfReferencing implements Serializable {

private final SerializableSupplier<NestedSelfReferencing> cycle;

NestedSelfReferencing() {
this.cycle = () -> this;
}

public SerializableSupplier<NestedSelfReferencing> getCycle() {
return cycle;
}
}

0 comments on commit e568639

Please sign in to comment.