Skip to content

Commit

Permalink
[FLINK-25969][table-runtime] Clean up caches in CompileUtils more agg…
Browse files Browse the repository at this point in the history
…ressively

This closes apache#18971.
  • Loading branch information
twalthr committed Mar 7, 2022
1 parent 47c599c commit fb05fab
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BatchPlanner(
}

override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
beforeTranslation()
val planner = createDummyPlanner()

val transformations = execGraph.getRootNodes.map {
Expand All @@ -85,7 +85,7 @@ class BatchPlanner(
throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
cleanupInternalConfigurations()
afterTranslation()
transformations
}

Expand Down Expand Up @@ -152,8 +152,8 @@ class BatchPlanner(
throw new UnsupportedOperationException(
"The compiled plan feature is not supported in batch mode.")

override def validateAndOverrideConfiguration(): Unit = {
super.validateAndOverrideConfiguration()
override def beforeTranslation(): Unit = {
super.beforeTranslation()
val runtimeMode = getConfiguration.get(ExecutionOptions.RUNTIME_MODE)
if (runtimeMode != RuntimeExecutionMode.BATCH) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink}
import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
import org.apache.flink.table.runtime.generated.CompileUtils
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter

Expand Down Expand Up @@ -183,7 +184,7 @@ abstract class PlannerBase(

override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
beforeTranslation()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
Expand All @@ -192,7 +193,7 @@ abstract class PlannerBase(
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
afterTranslation()
transformations
}

Expand Down Expand Up @@ -477,11 +478,7 @@ abstract class PlannerBase(
)
}

/**
* Different planner has different rules. Validate the planner and runtime mode is consistent with
* the configuration before planner do optimization with [[ModifyOperation]] or other works.
*/
protected def validateAndOverrideConfiguration(): Unit = {
protected def beforeTranslation(): Unit = {
val configuration = tableConfig.getConfiguration

// Add query start time to TableConfig, these config are used internally,
Expand All @@ -504,13 +501,14 @@ abstract class PlannerBase(
}
}

/**
* Cleanup all internal configuration after plan translation finished.
*/
protected def cleanupInternalConfigurations(): Unit = {
protected def afterTranslation(): Unit = {
// Cleanup all internal configuration after plan translation finished.
val configuration = tableConfig.getConfiguration
configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME)
configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME)

// Clean caches that might have filled up during optimization
CompileUtils.cleanUp()
}

/**
Expand All @@ -519,7 +517,7 @@ abstract class PlannerBase(
private[flink] def getExplainGraphs(operations: util.List[Operation]
): (mutable.Buffer[RelNode], Seq[RelNode], ExecNodeGraph, StreamGraph) = {
require(operations.nonEmpty, "operations should not be empty")
validateAndOverrideConfiguration()
beforeTranslation()
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
val relNode = getRelBuilder.queryOperation(queryOperation).build()
Expand Down Expand Up @@ -548,7 +546,7 @@ abstract class PlannerBase(
val execGraph = translateToExecNodeGraph(optimizedRelNodes)

val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
afterTranslation()

val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
.asInstanceOf[StreamGraph]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ class StreamPlanner(
override protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq()

override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
beforeTranslation()
val planner = createDummyPlanner()
val transformations = execGraph.getRootNodes.map {
case node: StreamExecNode[_] => node.translateToPlan(planner)
case _ =>
throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
cleanupInternalConfigurations()
afterTranslation()
transformations
}

Expand Down Expand Up @@ -163,11 +163,11 @@ class StreamPlanner(

override def compilePlan(
modifyOperations: util.List[ModifyOperation]): InternalPlan = {
validateAndOverrideConfiguration()
beforeTranslation()
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
cleanupInternalConfigurations()
afterTranslation()

new ExecNodeGraphInternalPlan(
JsonSerdeUtil.createObjectWriter(createSerdeContext)
Expand All @@ -177,18 +177,18 @@ class StreamPlanner(
}

override def translatePlan(plan: InternalPlan): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
beforeTranslation()
val execGraph = plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
afterTranslation()
transformations
}

override def explainPlan(plan: InternalPlan, extraDetails: ExplainDetail*): String = {
validateAndOverrideConfiguration()
beforeTranslation()
val execGraph = plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
afterTranslation()

val streamGraph = executor.createPipeline(transformations, tableConfig.getConfiguration, null)
.asInstanceOf[StreamGraph]
Expand All @@ -208,8 +208,8 @@ class StreamPlanner(
sb.toString()
}

override def validateAndOverrideConfiguration(): Unit = {
super.validateAndOverrideConfiguration()
override def beforeTranslation(): Unit = {
super.beforeTranslation()
val runtimeMode = getConfiguration.get(ExecutionOptions.RUNTIME_MODE)
if (runtimeMode != RuntimeExecutionMode.STREAMING) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Objects;

Expand All @@ -48,16 +49,30 @@ public final class CompileUtils {
* Meta zone GC (class unloading), resulting in performance bottlenecks. So we add a cache to
* avoid this problem.
*/
protected static final Cache<String, Cache<ClassLoader, Class>> COMPILED_CACHE =
static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
CacheBuilder.newBuilder()
.maximumSize(100) // estimated cache size
// estimated maximum planning/startup time
.expireAfterAccess(Duration.ofMinutes(5))
// estimated cache size
.maximumSize(300)
.softValues()
.build();

protected static final Cache<ExpressionEntry, ExpressionEvaluator> COMPILED_EXPRESSION_CACHE =
static final Cache<ExpressionKey, ExpressionEvaluator> COMPILED_EXPRESSION_CACHE =
CacheBuilder.newBuilder()
.maximumSize(100) // estimated cache size
// estimated maximum planning/startup time
.expireAfterAccess(Duration.ofMinutes(5))
// estimated cache size
.maximumSize(100)
.softValues()
.build();

/** Triggers internal garbage collection of expired cache entries. */
public static void cleanUp() {
COMPILED_CLASS_CACHE.cleanUp();
COMPILED_EXPRESSION_CACHE.cleanUp();
}

/**
* Compiles a generated code to a Class.
*
Expand All @@ -70,18 +85,11 @@ public final class CompileUtils {
@SuppressWarnings("unchecked")
public static <T> Class<T> compile(ClassLoader cl, String name, String code) {
try {
Cache<ClassLoader, Class> compiledClasses =
COMPILED_CACHE.get(
// "code" as a key should be sufficient as the class name
// is part of the Java code
code,
() ->
CacheBuilder.newBuilder()
.maximumSize(5)
.weakKeys()
.softValues()
.build());
return compiledClasses.get(cl, () -> doCompile(cl, name, code));
// The class name is part of the "code" and makes the string unique,
// to prevent class leaks we don't cache the class loader directly
// but only its hash code
final ClassKey classKey = new ClassKey(cl.hashCode(), code);
return (Class<T>) COMPILED_CLASS_CACHE.get(classKey, () -> doCompile(cl, name, code));
} catch (Exception e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}
Expand Down Expand Up @@ -135,8 +143,8 @@ public static ExpressionEvaluator compileExpression(
List<Class<?>> argumentClasses,
Class<?> returnClass) {
try {
ExpressionEntry key =
new ExpressionEntry(code, argumentNames, argumentClasses, returnClass);
ExpressionKey key =
new ExpressionKey(code, argumentNames, argumentClasses, returnClass);
return COMPILED_EXPRESSION_CACHE.get(
key,
() -> {
Expand All @@ -163,14 +171,42 @@ public static ExpressionEvaluator compileExpression(
}
}

/** Class to use as key for the {@link #COMPILED_CLASS_CACHE}. */
private static class ClassKey {
private final int classLoaderId;
private final String code;

private ClassKey(int classLoaderId, String code) {
this.classLoaderId = classLoaderId;
this.code = code;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClassKey classKey = (ClassKey) o;
return classLoaderId == classKey.classLoaderId && code.equals(classKey.code);
}

@Override
public int hashCode() {
return Objects.hash(classLoaderId, code);
}
}

/** Class to use as key for the {@link #COMPILED_EXPRESSION_CACHE}. */
private static class ExpressionEntry {
private static class ExpressionKey {
private final String code;
private final List<String> argumentNames;
private final List<Class<?>> argumentClasses;
private final Class<?> returnClass;

private ExpressionEntry(
private ExpressionKey(
String code,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Expand All @@ -189,7 +225,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ExpressionEntry that = (ExpressionEntry) o;
ExpressionKey that = (ExpressionKey) o;
return code.equals(that.code)
&& argumentNames.equals(that.argumentNames)
&& argumentClasses.equals(that.argumentClasses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class CompileUtilsTest {
@Before
public void before() {
// cleanup cached class before tests
CompileUtils.COMPILED_CACHE.invalidateAll();
CompileUtils.COMPILED_CLASS_CACHE.invalidateAll();
CompileUtils.COMPILED_EXPRESSION_CACHE.invalidateAll();
}

@Test
Expand Down

0 comments on commit fb05fab

Please sign in to comment.