From abf3091e9da92660490e600b7605f3e2d2d53ad4 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 3 Jul 2020 16:30:34 +0200 Subject: [PATCH] [FLINK-18419] Make user ClassLoader available in TableEnvironment --- .../client/gateway/local/ExecutionContext.java | 9 ++++++--- .../internal/StreamTableEnvironmentImpl.java | 16 +++++++++++++--- .../internal/StreamTableEnvironmentImplTest.java | 3 ++- .../table/api/internal/TableEnvironmentImpl.java | 8 ++++++-- .../flink/table/utils/TableEnvironmentMock.java | 10 +++++++++- .../internal/StreamTableEnvironmentImpl.scala | 9 ++++++--- .../StreamTableEnvironmentImplTest.scala | 3 ++- .../table/planner/utils/TableTestBase.scala | 9 ++++++--- .../api/stream/StreamTableEnvironmentTest.scala | 3 ++- .../table/api/stream/sql/AggregateTest.scala | 3 ++- .../apache/flink/table/utils/TableTestBase.scala | 6 ++++-- 11 files changed, 58 insertions(+), 21 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 0a444eb2f6214..60e7736c313ad 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -424,7 +424,8 @@ private static TableEnvironment createStreamTableEnvironment( Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, - FunctionCatalog functionCatalog) { + FunctionCatalog functionCatalog, + ClassLoader userClassLoader) { final Map plannerProperties = settings.toPlannerProperties(); final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) @@ -438,7 +439,8 @@ private static TableEnvironment createStreamTableEnvironment( env, planner, executor, - settings.isStreamingMode()); + settings.isStreamingMode(), + userClassLoader); } private static Executor lookupExecutor( @@ -599,7 +601,8 @@ private void createTableEnvironment( executor, catalogManager, moduleManager, - functionCatalog); + functionCatalog, + classLoader); } else if (environment.getExecution().isBatchPlanner()) { streamExecEnv = null; execEnv = ExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 15877ad52c96c..05c2f202db11d 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -89,8 +89,17 @@ public StreamTableEnvironmentImpl( StreamExecutionEnvironment executionEnvironment, Planner planner, Executor executor, - boolean isStreamingMode) { - super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode); + boolean isStreamingMode, + ClassLoader userClassLoader) { + super( + catalogManager, + moduleManager, + tableConfig, + executor, + functionCatalog, + planner, + isStreamingMode, + userClassLoader); this.executionEnvironment = executionEnvironment; } @@ -137,7 +146,8 @@ public static StreamTableEnvironment create( executionEnvironment, planner, executor, - settings.isStreamingMode() + settings.isStreamingMode(), + classLoader ); } diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java index 954fc45678b84..7fcbf80ae0f88 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java @@ -101,7 +101,8 @@ private StreamTableEnvironmentImpl getStreamTableEnvironment( env, new TestPlanner(elements.getTransformation()), new ExecutorMock(), - true + true, + this.getClass().getClassLoader() ); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 05c578b280c0b..954edb54a8db8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -164,6 +164,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { protected final Planner planner; protected final Parser parser; private final boolean isStreamingMode; + private final ClassLoader userClassLoader; private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG = "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " + @@ -197,7 +198,8 @@ protected TableEnvironmentImpl( Executor executor, FunctionCatalog functionCatalog, Planner planner, - boolean isStreamingMode) { + boolean isStreamingMode, + ClassLoader userClassLoader) { this.catalogManager = catalogManager; this.catalogManager.setCatalogTableSchemaResolver( new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode)); @@ -210,6 +212,7 @@ protected TableEnvironmentImpl( this.planner = planner; this.parser = planner.getParser(); this.isStreamingMode = isStreamingMode; + this.userClassLoader = userClassLoader; this.operationTreeBuilder = OperationTreeBuilder.create( tableConfig, functionCatalog.asLookup(parser::parseIdentifier), @@ -272,7 +275,8 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { executor, functionCatalog, planner, - settings.isStreamingMode() + settings.isStreamingMode(), + classLoader ); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java index dcbec07fa775e..5d0f021c5733e 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java @@ -46,7 +46,15 @@ protected TableEnvironmentMock( FunctionCatalog functionCatalog, PlannerMock planner, boolean isStreamingMode) { - super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode); + super( + catalogManager, + moduleManager, + tableConfig, + executor, + functionCatalog, + planner, + isStreamingMode, + TableEnvironmentMock.class.getClassLoader()); this.catalogManager = catalogManager; this.executor = executor; diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 7c99e37f97968..317cb6a20aae8 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -58,7 +58,8 @@ class StreamTableEnvironmentImpl ( scalaExecutionEnvironment: StreamExecutionEnvironment, planner: Planner, executor: Executor, - isStreaming: Boolean) + isStreaming: Boolean, + userClassLoader: ClassLoader) extends TableEnvironmentImpl( catalogManager, moduleManager, @@ -66,7 +67,8 @@ class StreamTableEnvironmentImpl ( executor, functionCatalog, planner, - isStreaming) + isStreaming, + userClassLoader) with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment { override def fromDataStream[T](dataStream: DataStream[T]): Table = { @@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl { executionEnvironment, planner, executor, - settings.isStreamingMode + settings.isStreamingMode, + classLoader ) } diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala index 0f84d9b65daed..7f67ebba73397 100644 --- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala +++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala @@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest { env, new TestPlanner(elements.javaStream.getTransformation), new ExecutorMock, - true) + true, + this.getClass.getClassLoader) } private class TestPlanner(transformation: Transformation[_]) extends PlannerMock { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 05b54c7228f40..c9429116cd107 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -959,7 +959,8 @@ class TestingTableEnvironment private( executor: Executor, functionCatalog: FunctionCatalog, planner: PlannerBase, - isStreamingMode: Boolean) + isStreamingMode: Boolean, + userClassLoader: ClassLoader) extends TableEnvironmentImpl( catalogManager, moduleManager, @@ -967,7 +968,8 @@ class TestingTableEnvironment private( executor, functionCatalog, planner, - isStreamingMode) { + isStreamingMode, + userClassLoader) { // just for testing, remove this method while // ` void registerFunction(String name, AggregateFunction aggregateFunction);` @@ -1118,7 +1120,8 @@ object TestingTableEnvironment { executor, functionCatalog, planner, - settings.isStreamingMode) + settings.isStreamingMode, + classLoader) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 5ef115852be1c..406e6b45dcc45 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase { jStreamExecEnv, streamPlanner, executor, - true) + true, + Thread.currentThread().getContextClassLoader) val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala index f7da2d682202f..835a26fd926c4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala @@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase { Mockito.mock(classOf[StreamExecutionEnvironment]), new PlannerMock, Mockito.mock(classOf[Executor]), - true + true, + Thread.currentThread().getContextClassLoader ) tablEnv.registerFunction("udag", new MyAgg) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 44e0bb952c276..562a2402454ba 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -347,7 +347,8 @@ case class StreamTableTestUtil( javaEnv, streamPlanner, executor, - true) + true, + Thread.currentThread().getContextClassLoader) val env = new StreamExecutionEnvironment(javaEnv) val tableEnv = new ScalaStreamTableEnvironmentImpl( @@ -358,7 +359,8 @@ case class StreamTableTestUtil( env, streamPlanner, executor, - true) + true, + Thread.currentThread().getContextClassLoader) def addTable[T: TypeInformation]( name: String,