Skip to content

Commit

Permalink
[FLINK-18419] Create catalog in TableEnvironment using user ClassLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jul 10, 2020
1 parent abf3091 commit 69ef4b7
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
Expand Down Expand Up @@ -983,15 +985,7 @@ private TableResult executeOperation(Operation operation) {
} else if (operation instanceof AlterCatalogFunctionOperation) {
return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
} else if (operation instanceof CreateCatalogOperation) {
CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
try {
catalogManager.registerCatalog(
createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(exMsg, e);
}
return createCatalog((CreateCatalogOperation) operation);
} else if (operation instanceof DropCatalogOperation) {
DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
Expand Down Expand Up @@ -1047,6 +1041,24 @@ private TableResult executeOperation(Operation operation) {
}
}

private TableResult createCatalog(CreateCatalogOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
String catalogName = operation.getCatalogName();
Map<String, String> properties = operation.getProperties();
final CatalogFactory factory = TableFactoryService.find(
CatalogFactory.class,
properties,
userClassLoader);

Catalog catalog = factory.createCatalog(catalogName, properties);
catalogManager.registerCatalog(catalogName, catalog);
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(exMsg, e);
}
}

private TableResult buildShowResult(String columnName, String[] objects) {
return buildResult(
new String[]{columnName},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand All @@ -33,25 +32,26 @@
*/
public class CreateCatalogOperation implements CreateOperation {
private final String catalogName;
private final Catalog catalog;
private final Map<String, String> properties;

public CreateCatalogOperation(String catalogName, Catalog catalog) {
public CreateCatalogOperation(String catalogName, Map<String, String> properties) {
this.catalogName = checkNotNull(catalogName);
this.catalog = checkNotNull(catalog);
this.properties = checkNotNull(properties);
}

public String getCatalogName() {
return catalogName;
}

public Catalog getCatalog() {
return catalog;
public Map<String, String> getProperties() {
return Collections.unmodifiableMap(properties);
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogName", catalogName);
params.put("properties", properties);

return OperationUtils.formatWithChildren(
"CREATE CATALOG",
Expand Down
8 changes: 8 additions & 0 deletions flink-table/flink-table-planner-blink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- SuccessException used in TestValuesTableFactory -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
Expand Down Expand Up @@ -553,11 +551,7 @@ private Operation convertCreateCatalog(SqlCreateCatalog sqlCreateCatalog) {
sqlCreateCatalog.getPropertyList().getList().forEach(p ->
properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));

final CatalogFactory factory =
TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());

Catalog catalog = factory.createCatalog(catalogName, properties);
return new CreateCatalogOperation(catalogName, catalog);
return new CreateCatalogOperation(catalogName, properties);
}

/** Convert DROP CATALOG statement. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.net.URLClassLoader;

import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
import static org.junit.Assert.assertFalse;
Expand All @@ -35,6 +41,9 @@
*/
public class CatalogITCase {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testCreateCatalog() {
String name = "c1";
Expand All @@ -61,9 +70,54 @@ public void testDropCatalog() {
assertFalse(tableEnv.getCatalog(name).isPresent());
}

@Test
public void testCreateCatalogFromUserClassLoader() throws Exception {
final String className = "UserCatalogFactory";
URLClassLoader classLoader = ClassLoaderUtils.withRoot(temporaryFolder.newFolder())
.addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", "UserCatalogFactory")
.addClass(
className,
"import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" +
"import org.apache.flink.table.factories.CatalogFactory;\n" +
"import java.util.Collections;\n" +
"import org.apache.flink.table.catalog.Catalog;\n" +
"import java.util.HashMap;\n" +
"import java.util.List;\n" +
"import java.util.Map;\n" +
"\tpublic class UserCatalogFactory implements CatalogFactory {\n" +
"\t\t@Override\n" +
"\t\tpublic Catalog createCatalog(\n" +
"\t\t\t\tString name,\n" +
"\t\t\t\tMap<String, String> properties) {\n" +
"\t\t\treturn new GenericInMemoryCatalog(name);\n" +
"\t\t}\n" +
"\n" +
"\t\t@Override\n" +
"\t\tpublic Map<String, String> requiredContext() {\n" +
"\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n" +
"\t\t\thashMap.put(\"type\", \"userCatalog\");\n" +
"\t\t\treturn hashMap;\n" +
"\t\t}\n" +
"\n" +
"\t\t@Override\n" +
"\t\tpublic List<String> supportedProperties() {\n" +
"\t\t\treturn Collections.emptyList();\n" +
"\t\t}\n" +
"\t}"
).build();

try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
TableEnvironment tableEnvironment = getTableEnvironment();
tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')");

assertTrue(tableEnvironment.getCatalog("cat").isPresent());
}
}

private TableEnvironment getTableEnvironment() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
return StreamTableEnvironment.create(env, settings);
}

}

0 comments on commit 69ef4b7

Please sign in to comment.