Skip to content

Commit

Permalink
[FLINK-18419] Make user ClassLoader available in TableEnvironment
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jul 10, 2020
1 parent 8158421 commit abf3091
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ private static TableEnvironment createStreamTableEnvironment(
Executor executor,
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog) {
FunctionCatalog functionCatalog,
ClassLoader userClassLoader) {

final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
Expand All @@ -438,7 +439,8 @@ private static TableEnvironment createStreamTableEnvironment(
env,
planner,
executor,
settings.isStreamingMode());
settings.isStreamingMode(),
userClassLoader);
}

private static Executor lookupExecutor(
Expand Down Expand Up @@ -599,7 +601,8 @@ private void createTableEnvironment(
executor,
catalogManager,
moduleManager,
functionCatalog);
functionCatalog,
classLoader);
} else if (environment.getExecution().isBatchPlanner()) {
streamExecEnv = null;
execEnv = ExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,17 @@ public StreamTableEnvironmentImpl(
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
boolean isStreamingMode,
ClassLoader userClassLoader) {
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
userClassLoader);
this.executionEnvironment = executionEnvironment;
}

Expand Down Expand Up @@ -137,7 +146,8 @@ public static StreamTableEnvironment create(
executionEnvironment,
planner,
executor,
settings.isStreamingMode()
settings.isStreamingMode(),
classLoader
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private StreamTableEnvironmentImpl getStreamTableEnvironment(
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
true
true,
this.getClass().getClassLoader()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final Planner planner;
protected final Parser parser;
private final boolean isStreamingMode;
private final ClassLoader userClassLoader;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
Expand Down Expand Up @@ -197,7 +198,8 @@ protected TableEnvironmentImpl(
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode) {
boolean isStreamingMode,
ClassLoader userClassLoader) {
this.catalogManager = catalogManager;
this.catalogManager.setCatalogTableSchemaResolver(
new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
Expand All @@ -210,6 +212,7 @@ protected TableEnvironmentImpl(
this.planner = planner;
this.parser = planner.getParser();
this.isStreamingMode = isStreamingMode;
this.userClassLoader = userClassLoader;
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
Expand Down Expand Up @@ -272,7 +275,8 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
executor,
functionCatalog,
planner,
settings.isStreamingMode()
settings.isStreamingMode(),
classLoader
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ protected TableEnvironmentMock(
FunctionCatalog functionCatalog,
PlannerMock planner,
boolean isStreamingMode) {
super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
super(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
TableEnvironmentMock.class.getClassLoader());

this.catalogManager = catalogManager;
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ class StreamTableEnvironmentImpl (
scalaExecutionEnvironment: StreamExecutionEnvironment,
planner: Planner,
executor: Executor,
isStreaming: Boolean)
isStreaming: Boolean,
userClassLoader: ClassLoader)
extends TableEnvironmentImpl(
catalogManager,
moduleManager,
config,
executor,
functionCatalog,
planner,
isStreaming)
isStreaming,
userClassLoader)
with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {

override def fromDataStream[T](dataStream: DataStream[T]): Table = {
Expand Down Expand Up @@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl {
executionEnvironment,
planner,
executor,
settings.isStreamingMode
settings.isStreamingMode,
classLoader
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest {
env,
new TestPlanner(elements.javaStream.getTransformation),
new ExecutorMock,
true)
true,
this.getClass.getClassLoader)
}

private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,15 +959,17 @@ class TestingTableEnvironment private(
executor: Executor,
functionCatalog: FunctionCatalog,
planner: PlannerBase,
isStreamingMode: Boolean)
isStreamingMode: Boolean,
userClassLoader: ClassLoader)
extends TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode) {
isStreamingMode,
userClassLoader) {

// just for testing, remove this method while
// `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
Expand Down Expand Up @@ -1118,7 +1120,8 @@ object TestingTableEnvironment {
executor,
functionCatalog,
planner,
settings.isStreamingMode)
settings.isStreamingMode,
classLoader)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
jStreamExecEnv,
streamPlanner,
executor,
true)
true,
Thread.currentThread().getContextClassLoader)

val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
.asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase {
Mockito.mock(classOf[StreamExecutionEnvironment]),
new PlannerMock,
Mockito.mock(classOf[Executor]),
true
true,
Thread.currentThread().getContextClassLoader
)

tablEnv.registerFunction("udag", new MyAgg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ case class StreamTableTestUtil(
javaEnv,
streamPlanner,
executor,
true)
true,
Thread.currentThread().getContextClassLoader)

val env = new StreamExecutionEnvironment(javaEnv)
val tableEnv = new ScalaStreamTableEnvironmentImpl(
Expand All @@ -358,7 +359,8 @@ case class StreamTableTestUtil(
env,
streamPlanner,
executor,
true)
true,
Thread.currentThread().getContextClassLoader)

def addTable[T: TypeInformation](
name: String,
Expand Down

0 comments on commit abf3091

Please sign in to comment.