Skip to content

Commit

Permalink
[FLINK-15552][table] Use thread context classloader to find factories…
Browse files Browse the repository at this point in the history
… in TableFactoryService

This fixes the problem that `--library` and `--jar` doesn't work for DDL in SQL CLI, because the user's classloader is not passed to the TableFactoryService. A long-term solution is not to use the context-classloader which is tracked by FLINK-15635.

This closes apache#10874
  • Loading branch information
leonardBang authored and wuchong committed Jan 23, 2020
1 parent b2fda00 commit 338ae70
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class ScalaShellITCase extends TestLogger {
@Rule
def temporaryFolder = _temporaryFolder

@After
def resetClassLoder(): Unit = {
// The Scala interpreter changes current class loader to ScalaClassLoader in every execution
// refer to [[ILoop.process()]]. So, we need reset it to original class loader after every Test.
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
}

/** Prevent re-creation of environment */
@Test
def testPreventRecreationBatch(): Unit = {
Expand Down Expand Up @@ -459,12 +466,6 @@ object ScalaShellITCase {
@ClassRule
def clusterResource = _clusterResource

@AfterClass
def afterAll(): Unit = {
// The Scala interpreter somehow changes the class loader. Therefore, we have to reset it
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
}

/**
* Run the input using a Scala Shell and return the output of the shell.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public void createTable(String sessionId, String ddl) throws SqlExecutionExcepti
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tEnv = context.getTableEnvironment();
try {
tEnv.sqlUpdate(ddl);
context.wrapClassLoader(() -> tEnv.sqlUpdate(ddl));
} catch (Exception e) {
throw new SqlExecutionException("Could not create a table from statement: " + ddl, e);
}
Expand All @@ -364,7 +364,7 @@ public void dropTable(String sessionId, String ddl) throws SqlExecutionException
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tEnv = context.getTableEnvironment();
try {
tEnv.sqlUpdate(ddl);
context.wrapClassLoader(() -> tEnv.sqlUpdate(ddl));
} catch (Exception e) {
throw new SqlExecutionException("Could not drop table from statement: " + ddl, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
*/
public class TableFactoryService {

private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);
private static final Logger LOG = LoggerFactory.getLogger(TableFactoryService.class);

/**
Expand Down Expand Up @@ -208,14 +207,11 @@ private static <T extends TableFactory> List<T> filter(
private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
try {
List<TableFactory> result = new LinkedList<>();
if (classLoader.isPresent()) {
ServiceLoader
.load(TableFactory.class, classLoader.get())
.iterator()
.forEachRemaining(result::add);
} else {
defaultLoader.iterator().forEachRemaining(result::add);
}
ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
ServiceLoader
.load(TableFactory.class, cl)
.iterator()
.forEachRemaining(result::add);
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for table factories.", e);
Expand Down

0 comments on commit 338ae70

Please sign in to comment.