diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java deleted file mode 100644 index d605612e6cdf6..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.module.hive; - -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.ModuleDescriptor; - -import java.util.Map; - -import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_HIVE_VERSION; -import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_TYPE_HIVE; - -/** Module descriptor for {@link HiveModule}. */ -public class HiveModuleDescriptor extends ModuleDescriptor { - private String hiveVersion; - - public HiveModuleDescriptor() { - this(null); - } - - public HiveModuleDescriptor(String hiveVersion) { - super(MODULE_TYPE_HIVE); - this.hiveVersion = hiveVersion; - } - - @Override - protected Map toModuleProperties() { - final DescriptorProperties properties = new DescriptorProperties(); - - if (hiveVersion != null) { - properties.putString(MODULE_HIVE_VERSION, hiveVersion); - } - - return properties.asMap(); - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java index 5881debd8bc77..451ab52aaae87 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java @@ -18,54 +18,51 @@ package org.apache.flink.table.module.hive; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.module.Module; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Collections; +import java.util.Set; -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; -import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_HIVE_VERSION; -import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_TYPE_HIVE; +import static org.apache.flink.table.module.hive.HiveModuleOptions.HIVE_VERSION; /** Factory for {@link HiveModule}. */ +@Internal public class HiveModuleFactory implements ModuleFactory { - @Override - public Module createModule(Map properties) { - final DescriptorProperties descProperties = getValidatedProperties(properties); - - final String hiveVersion = - descProperties - .getOptionalString(MODULE_HIVE_VERSION) - .orElse(HiveShimLoader.getHiveVersion()); + public static final String IDENTIFIER = "hive"; - return new HiveModule(hiveVersion); + @Override + public String factoryIdentifier() { + return IDENTIFIER; } - private static DescriptorProperties getValidatedProperties(Map properties) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - new HiveModuleDescriptorValidator().validate(descriptorProperties); - - return descriptorProperties; + @Override + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(MODULE_TYPE, MODULE_TYPE_HIVE); - - return context; + public Set> optionalOptions() { + return Collections.singleton(HIVE_VERSION); } @Override - public List supportedProperties() { - return Arrays.asList(MODULE_HIVE_VERSION); + public Module createModule(Context context) { + final FactoryUtil.ModuleFactoryHelper factoryHelper = + FactoryUtil.createModuleFactoryHelper(this, context); + factoryHelper.validate(); + + final String hiveVersion = + factoryHelper + .getOptions() + .getOptional(HIVE_VERSION) + .orElseGet(HiveShimLoader::getHiveVersion); + + return new HiveModule(hiveVersion); } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptorValidator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleOptions.java similarity index 57% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptorValidator.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleOptions.java index aee1b5cc53972..2fcc852d830c8 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptorValidator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleOptions.java @@ -18,18 +18,16 @@ package org.apache.flink.table.module.hive; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.ModuleDescriptorValidator; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; -/** Validator for {@link HiveModuleDescriptor}. */ -public class HiveModuleDescriptorValidator extends ModuleDescriptorValidator { - public static final String MODULE_TYPE_HIVE = "hive"; - public static final String MODULE_HIVE_VERSION = "hive-version"; +/** Configuration options for the Hive module. */ +@PublicEvolving +public class HiveModuleOptions { - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(MODULE_TYPE, MODULE_TYPE_HIVE, false); - properties.validateString(MODULE_HIVE_VERSION, true, 1); - } + public static final ConfigOption HIVE_VERSION = + ConfigOptions.key("hive-version").stringType().noDefaultValue(); + + private HiveModuleOptions() {} } diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 27d69ee2ed0ed..e1871cfae2cb3 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory +org.apache.flink.table.module.hive.HiveModuleFactory diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 5292f1062da66..cca4145575254 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,5 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.module.hive.HiveModuleFactory org.apache.flink.table.planner.delegation.hive.HiveParserFactory diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleDescriptorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleDescriptorTest.java deleted file mode 100644 index 13690216a42d0..0000000000000 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleDescriptorTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.module.hive; - -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorTestBase; -import org.apache.flink.table.descriptors.DescriptorValidator; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** Tests for {@link HiveModuleDescriptor}. */ -public class HiveModuleDescriptorTest extends DescriptorTestBase { - - private final String hiveVersion = "2.3.4"; - - @Override - protected List descriptors() { - final Descriptor descriptor = new HiveModuleDescriptor(hiveVersion); - - return Arrays.asList(descriptor); - } - - @Override - protected List> properties() { - final Map props1 = new HashMap<>(); - props1.put("type", "hive"); - props1.put("hive-version", hiveVersion); - - return Arrays.asList(props1); - } - - @Override - protected DescriptorValidator validator() { - return new HiveModuleDescriptorValidator(); - } -} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleFactoryTest.java index 18aa5b2a0f4a2..0c0129c62e1fb 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleFactoryTest.java @@ -17,14 +17,13 @@ package org.apache.flink.table.module.hive; -import org.apache.flink.table.descriptors.ModuleDescriptor; -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.module.Module; import org.junit.Test; -import java.util.Map; +import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -34,12 +33,12 @@ public class HiveModuleFactoryTest { public void test() { final HiveModule expected = new HiveModule(); - final ModuleDescriptor moduleDescriptor = new HiveModuleDescriptor(); - - final Map properties = moduleDescriptor.toProperties(); - final Module actualModule = - TableFactoryService.find(ModuleFactory.class, properties).createModule(properties); + FactoryUtil.createModule( + HiveModuleFactory.IDENTIFIER, + Collections.emptyMap(), + new Configuration(), + Thread.currentThread().getContextClassLoader()); checkEquals(expected, (HiveModule) actualModule); } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 0527fc044ce23..a7094e843fc7e 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -49,8 +49,6 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.module.Module; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.types.DataType; @@ -61,7 +59,6 @@ import java.net.URL; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -72,7 +69,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -84,7 +80,6 @@ public class DependencyTest { private static final String TEST_PROPERTY_VALUE = "test-value"; public static final String CATALOG_TYPE_TEST = "DependencyTest"; - public static final String MODULE_TYPE_TEST = "ModuleDependencyTest"; private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar"; private static final List INIT_SQL = @@ -203,32 +198,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { } } - /** Module that can be discovered if classloading is correct. */ - public static class TestModuleFactory implements ModuleFactory { - - @Override - public Module createModule(Map properties) { - return new TestModule(); - } - - @Override - public Map requiredContext() { - final Map context = new HashMap<>(); - context.put(MODULE_TYPE, MODULE_TYPE_TEST); - return context; - } - - @Override - public List supportedProperties() { - final List properties = new ArrayList<>(); - properties.add("test"); - return properties; - } - } - - /** Test module. */ - public static class TestModule implements Module {} - /** Catalog that can be discovered if classloading is correct. */ public static class TestCatalogFactory implements CatalogFactory { diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index 07e31cc2ae86d..0000000000000 --- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory diff --git a/flink-table/flink-sql-client/src/test/resources/sql/module.q b/flink-table/flink-sql-client/src/test/resources/sql/module.q index 06a20551d3848..318d392dbbd2e 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/module.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/module.q @@ -72,24 +72,18 @@ Was expecting one of: # load hive module with module name capitalized LOAD MODULE Hive; [ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.ModuleFactory' in -the classpath. +org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'Hive' that implements 'org.apache.flink.table.factories.ModuleFactory' in the classpath. -Reason: Required context properties mismatch. +Available factory identifiers are: -The following properties are requested: -type=Hive - -The following factories have been considered: -org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory -org.apache.flink.table.module.CoreModuleFactory -org.apache.flink.table.module.hive.HiveModuleFactory +core +hive !error # load hive module with specifying type LOAD MODULE myhive WITH ('type' = 'hive'); [ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.ValidationException: Property 'type' = 'hive' is not supported since module name is used to find module +org.apache.flink.table.api.ValidationException: Option 'type' = 'hive' is not supported since module name is used to find module !error LOAD MODULE hive; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index e6d6b551f27cf..5730b7f9f62df 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -77,8 +77,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; @@ -165,7 +163,6 @@ import java.util.stream.StreamSupport; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; /** * Implementation of {@link TableEnvironment} that works exclusively with Table API interfaces. Only @@ -1331,21 +1328,15 @@ private TableResult createCatalog(CreateCatalogOperation operation) { } private TableResult loadModule(LoadModuleOperation operation) { - String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); + final String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); try { - // find module by name - Map properties = new HashMap<>(operation.getProperties()); - if (properties.containsKey(MODULE_TYPE)) { - throw new ValidationException( - String.format( - "Property 'type' = '%s' is not supported since module name " - + "is used to find module", - properties.get(MODULE_TYPE))); - } - properties.put(MODULE_TYPE, operation.getModuleName()); - final ModuleFactory factory = - TableFactoryService.find(ModuleFactory.class, properties, userClassLoader); - moduleManager.loadModule(operation.getModuleName(), factory.createModule(properties)); + final Module module = + FactoryUtil.createModule( + operation.getModuleName(), + operation.getOptions(), + tableConfig.getConfiguration(), + userClassLoader); + moduleManager.loadModule(operation.getModuleName(), module); return TableResultImpl.TABLE_RESULT_OK; } catch (ValidationException e) { throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java index 2409ca04046e7..c75eff207cd58 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/LoadModuleOperation.java @@ -18,37 +18,57 @@ package org.apache.flink.table.operations; +import org.apache.flink.annotation.Internal; + import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier; +import static org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes; import static org.apache.flink.util.Preconditions.checkNotNull; /** Operation to describe a LOAD MODULE statement. */ +@Internal public class LoadModuleOperation implements Operation { - private final String moduleName; - private final Map properties; + private final String moduleName; + private final Map options; - public LoadModuleOperation(String moduleName, Map properties) { + public LoadModuleOperation(String moduleName, Map options) { this.moduleName = checkNotNull(moduleName); - this.properties = checkNotNull(properties); + this.options = checkNotNull(options); } public String getModuleName() { return moduleName; } - public Map getProperties() { - return Collections.unmodifiableMap(properties); + public Map getOptions() { + return Collections.unmodifiableMap(options); } @Override public String asSummaryString() { - Map params = new LinkedHashMap<>(); - params.put("moduleName", moduleName); - params.put("properties", properties); - return OperationUtils.formatWithChildren( - "LOAD MODULE", params, Collections.emptyList(), Operation::asSummaryString); + final StringBuilder sb = new StringBuilder(); + sb.append("LOAD MODULE "); + sb.append(escapeIdentifier(moduleName)); + if (!options.isEmpty()) { + sb.append(" WITH ("); + + sb.append( + options.entrySet().stream() + .map( + entry -> + String.format( + "'%s' = '%s'", + escapeSingleQuotes(entry.getKey()), + escapeSingleQuotes(entry.getValue()))) + .collect(Collectors.joining(", "))); + + sb.append(")"); + } + + return sb.toString(); } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java index e8edf07c3738f..bc1c11a467557 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/module/ModuleManagerTest.java @@ -41,8 +41,10 @@ /** Tests for {@link ModuleManager}. */ public class ModuleManagerTest extends TestLogger { + + @Rule public final ExpectedException thrown = ExpectedException.none(); + private ModuleManager manager; - @Rule public ExpectedException thrown = ExpectedException.none(); @Before public void before() { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/CatalogException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/CatalogException.java index 2dcde9b2b207f..9c08c6477c55c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/CatalogException.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/CatalogException.java @@ -18,7 +18,10 @@ package org.apache.flink.table.catalog.exceptions; +import org.apache.flink.annotation.PublicEvolving; + /** A catalog-related, runtime exception. */ +@PublicEvolving public class CatalogException extends RuntimeException { /** @param message the detail message. */ public CatalogException(String message) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptor.java deleted file mode 100644 index cd966cb300721..0000000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * Describes a {@link org.apache.flink.table.module.Module}. - * - * @deprecated See {@link Descriptor} for details. - */ -@PublicEvolving -@Deprecated -public abstract class ModuleDescriptor implements Descriptor { - - private final String type; - - /** - * Constructs a {@link ModuleDescriptor}. - * - * @param type string that identifies this catalog - */ - public ModuleDescriptor(String type) { - checkArgument(!StringUtils.isNullOrWhitespaceOnly(type), "type cannot be null or empty"); - - this.type = type; - } - - @Override - public final Map toProperties() { - final DescriptorProperties properties = new DescriptorProperties(); - properties.putString(MODULE_TYPE, type); - - properties.putProperties(toModuleProperties()); - return properties.asMap(); - } - - @Override - public String toString() { - return DescriptorProperties.toString(toProperties()); - } - - /** Converts this descriptor into a set of module properties. */ - protected abstract Map toModuleProperties(); -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index b617edff93fb4..6e02a90d0f8cf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.table.factories; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -37,6 +38,7 @@ import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.module.Module; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.util.Preconditions; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -62,6 +65,7 @@ import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap; import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey; +import static org.apache.flink.table.module.CommonModuleOptions.MODULE_TYPE; /** Utility for working with {@link Factory}s. */ @PublicEvolving @@ -198,6 +202,16 @@ public static CatalogFactoryHelper createCatalogFactoryHelper( return new CatalogFactoryHelper(factory, context); } + /** + * Creates a utility that helps validating options for a {@link ModuleFactory}. + * + *

Note: This utility checks for left-over options in the final step. + */ + public static ModuleFactoryHelper createModuleFactoryHelper( + ModuleFactory factory, ModuleFactory.Context context) { + return new ModuleFactoryHelper(factory, context); + } + /** * Creates a utility that helps in discovering formats and validating all options for a {@link * DynamicTableFactory}. @@ -270,7 +284,6 @@ public static Catalog createCatalog( final DefaultCatalogContext context = new DefaultCatalogContext( catalogName, factoryOptions, configuration, classLoader); - return factory.createCatalog(context); } catch (Throwable t) { throw new ValidationException( @@ -290,6 +303,64 @@ public static Catalog createCatalog( } } + /** + * Discovers a matching module factory and creates an instance of it. + * + *

This first uses the legacy {@link TableFactory} stack to discover a matching {@link + * ModuleFactory}. If none is found, it falls back to the new stack using {@link Factory} + * instead. + */ + public static Module createModule( + String moduleName, + Map options, + ReadableConfig configuration, + ClassLoader classLoader) { + if (options.containsKey(MODULE_TYPE.key())) { + throw new ValidationException( + String.format( + "Option '%s' = '%s' is not supported since module name " + + "is used to find module", + MODULE_TYPE.key(), options.get(MODULE_TYPE.key()))); + } + + try { + final Map optionsWithType = new HashMap<>(options); + optionsWithType.put(MODULE_TYPE.key(), moduleName); + + final ModuleFactory legacyFactory = + TableFactoryService.find(ModuleFactory.class, optionsWithType, classLoader); + return legacyFactory.createModule(optionsWithType); + } catch (NoMatchingTableFactoryException e) { + final DefaultModuleContext discoveryContext = + new DefaultModuleContext(options, configuration, classLoader); + try { + final ModuleFactory factory = + discoverFactory( + ((ModuleFactory.Context) discoveryContext).getClassLoader(), + ModuleFactory.class, + moduleName); + + final DefaultModuleContext context = + new DefaultModuleContext(options, configuration, classLoader); + return factory.createModule(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create module '%s'.%n%nModule options are:%n%s", + moduleName, + options.entrySet().stream() + .map( + optionEntry -> + stringifyOption( + optionEntry.getKey(), + optionEntry.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + } + /** * Discovers a factory using the given factory base class and identifier. * @@ -691,6 +762,17 @@ public CatalogFactoryHelper(CatalogFactory catalogFactory, CatalogFactory.Contex } } + /** + * Helper utility for validating all options for a {@link ModuleFactory}. + * + * @see #createModuleFactoryHelper(ModuleFactory, ModuleFactory.Context) + */ + public static class ModuleFactoryHelper extends FactoryHelper { + public ModuleFactoryHelper(ModuleFactory moduleFactory, ModuleFactory.Context context) { + super(moduleFactory, context.getOptions(), PROPERTY_VERSION); + } + } + /** * Helper utility for discovering formats and validating all options for a {@link * DynamicTableFactory}. @@ -834,6 +916,7 @@ private ReadableConfig projectOptions(String formatPrefix) { } /** Default implementation of {@link DynamicTableFactory.Context}. */ + @Internal public static class DefaultDynamicTableContext implements DynamicTableFactory.Context { private final ObjectIdentifier objectIdentifier; @@ -882,6 +965,7 @@ public boolean isTemporary() { } /** Default implementation of {@link CatalogFactory.Context}. */ + @Internal public static class DefaultCatalogContext implements CatalogFactory.Context { private final String name; private final Map options; @@ -920,6 +1004,38 @@ public ClassLoader getClassLoader() { } } + /** Default implementation of {@link ModuleFactory.Context}. */ + @Internal + public static class DefaultModuleContext implements ModuleFactory.Context { + private final Map options; + private final ReadableConfig configuration; + private final ClassLoader classLoader; + + public DefaultModuleContext( + Map options, + ReadableConfig configuration, + ClassLoader classLoader) { + this.options = options; + this.configuration = configuration; + this.classLoader = classLoader; + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public ReadableConfig getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + } + // -------------------------------------------------------------------------------------------- private FactoryUtil() { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java index c76577053fee6..0bc2ebd791107 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ModuleFactory.java @@ -19,22 +19,103 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.module.Module; +import org.apache.flink.table.module.ModuleException; +import java.util.List; import java.util.Map; +import java.util.Set; /** * A factory to create configured module instances based on string-based properties. See also {@link - * TableFactory} for more information. + * Factory} for more information. + * + *

Note that this interface supports the {@link TableFactory} stack for compatibility purposes. + * This is deprecated, however, and new implementations should implement the {@link Factory} stack + * instead. */ @PublicEvolving -public interface ModuleFactory extends TableFactory { +public interface ModuleFactory extends TableFactory, Factory { /** * Creates and configures a {@link Module} using the given properties. * * @param properties normalized properties describing a module. * @return the configured module. + * @deprecated Use {@link #createModule(Context)} instead and implement {@link Factory} instead + * of {@link TableFactory}. */ - Module createModule(Map properties); + @Deprecated + default Module createModule(Map properties) { + throw new ModuleException("Module factories must implement createModule()."); + } + + /** Creates and configures a {@link Module}. */ + default Module createModule(Context context) { + throw new ModuleException("Module factories must implement createModule(Context)."); + } + + /** Context provided when a module is created. */ + interface Context { + /** + * Returns the options with which the module is created. + * + *

An implementation should perform validation of these options. + */ + Map getOptions(); + + /** Gives read-only access to the configuration of the current session. */ + ReadableConfig getConfiguration(); + + /** + * Returns the class loader of the current session. + * + *

The class loader is in particular useful for discovering further (nested) factories. + */ + ClassLoader getClassLoader(); + } + + default String factoryIdentifier() { + if (requiredContext() == null || supportedProperties() == null) { + throw new ModuleException("Module factories must implement factoryIdentifier()"); + } + + return null; + } + + default Set> requiredOptions() { + if (requiredContext() == null || supportedProperties() == null) { + throw new ModuleException("Module factories must implement requiredOptions()"); + } + + return null; + } + + default Set> optionalOptions() { + if (requiredContext() == null || supportedProperties() == null) { + throw new ModuleException("Module factories must implement optionalOptions()"); + } + + return null; + } + + // -------------------------------------------------------------------------------------------- + // Default implementations for legacy {@link TableFactory} stack. + // -------------------------------------------------------------------------------------------- + + /** @deprecated Implement the {@link Factory} based stack instead. */ + @Deprecated + default Map requiredContext() { + // Default implementation for modules implementing the new {@link Factory} stack instead. + return null; + } + + /** @deprecated Implement the {@link Factory} based stack instead. */ + @Deprecated + default List supportedProperties() { + // Default implementation for modules implementing the new {@link Factory} stack instead. + return null; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CommonModuleOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CommonModuleOptions.java new file mode 100644 index 0000000000000..afe9f16d58b32 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CommonModuleOptions.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.module; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.Factory; + +/** A collection of {@link ConfigOption} which are consistently used in multiple modules. */ +@Internal +public class CommonModuleOptions { + + /** + * {@link ConfigOption} which is used during module discovery to match it against {@link + * Factory#factoryIdentifier()}. + * + * @deprecated This is only required for the legacy factory stack + */ + @Deprecated + public static final ConfigOption MODULE_TYPE = + ConfigOptions.key("type").stringType().noDefaultValue(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CoreModuleFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CoreModuleFactory.java index 90f01d6f8be37..aa0ff971426ca 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CoreModuleFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/CoreModuleFactory.java @@ -19,14 +19,11 @@ package org.apache.flink.table.module; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.factories.ModuleFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; +import java.util.Collections; +import java.util.Set; /** Factory for {@link CoreModule}. */ @Internal @@ -35,20 +32,22 @@ public class CoreModuleFactory implements ModuleFactory { public static final String IDENTIFIER = "core"; @Override - public Module createModule(Map properties) { - return CoreModule.INSTANCE; + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(MODULE_TYPE, IDENTIFIER); + public Set> requiredOptions() { + return Collections.emptySet(); + } - return context; + @Override + public Set> optionalOptions() { + return Collections.emptySet(); } @Override - public List supportedProperties() { - return new ArrayList<>(); + public Module createModule(Context context) { + return CoreModule.INSTANCE; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/ModuleException.java similarity index 61% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptorValidator.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/ModuleException.java index 7b8929eba2ac8..f6b87a6442432 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ModuleDescriptorValidator.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/ModuleException.java @@ -16,20 +16,20 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.module; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; -/** Validator for {@link ModuleDescriptor}. */ -@Internal -@Deprecated -public abstract class ModuleDescriptorValidator implements DescriptorValidator { - - /** Key for describing the type of the module. Used for factory discovery. */ - public static final String MODULE_TYPE = "type"; +/** Exception related to modules. */ +@PublicEvolving +public class ModuleException extends RuntimeException { + /** Creates a new {@link ModuleException}. */ + public ModuleException(String message) { + super(message); + } - @Override - public void validate(DescriptorProperties properties) { - properties.validateString(MODULE_TYPE, false, 1); + /** Creates a new {@link ModuleException}. */ + public ModuleException(String message, Throwable throwable) { + super(message, throwable); } } diff --git a/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory rename to flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/CoreModuleFactoryTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/CoreModuleFactoryTest.java index b4ad4c92e2038..babb85e75e7b5 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/CoreModuleFactoryTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/CoreModuleFactoryTest.java @@ -18,15 +18,15 @@ package org.apache.flink.table.factories.module; -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.module.CoreModule; +import org.apache.flink.table.module.CoreModuleFactory; import org.apache.flink.table.module.Module; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -40,11 +40,12 @@ public class CoreModuleFactoryTest { public void test() { final CoreModule expectedModule = CoreModule.INSTANCE; - final Map properties = new HashMap<>(); - properties.put("type", "core"); - final Module actualModule = - TableFactoryService.find(ModuleFactory.class, properties).createModule(properties); + FactoryUtil.createModule( + CoreModuleFactory.IDENTIFIER, + Collections.emptyMap(), + new Configuration(), + Thread.currentThread().getContextClassLoader()); assertEquals(expectedModule, actualModule); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java index 65e61008f0f9f..faf30b715fabc 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/DummyModuleFactory.java @@ -18,49 +18,39 @@ package org.apache.flink.table.factories.module; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.ModuleDescriptorValidator; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.module.Module; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; +import java.util.Set; /** Test implementation for {@link ModuleFactory}. */ public class DummyModuleFactory implements ModuleFactory { + public static final String IDENTIFIER = "dummy"; + + private static final ConfigOption DUMMY_VERSION = + ConfigOptions.key("dummy-version").stringType().noDefaultValue(); + @Override - public Module createModule(Map properties) { - return new Module() {}; + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(MODULE_TYPE, DummyModuleDescriptorValidator.MODULE_TYPE_DUMMY); - - return context; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public List supportedProperties() { - return Collections.singletonList(DummyModuleDescriptorValidator.MODULE_DUMMY_VERSION); + public Set> optionalOptions() { + return Collections.singleton(DUMMY_VERSION); } - /** Test implementation for {@link ModuleDescriptorValidator}. */ - public static class DummyModuleDescriptorValidator extends ModuleDescriptorValidator { - public static final String MODULE_TYPE_DUMMY = "dummy"; - public static final String MODULE_DUMMY_VERSION = "dummy-version"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(MODULE_TYPE, MODULE_TYPE_DUMMY, false); - properties.validateString(MODULE_DUMMY_VERSION, true, 1); - } + @Override + public Module createModule(Context context) { + return new Module() {}; } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java new file mode 100644 index 0000000000000..26664f5566cab --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/module/LegacyDummyModuleFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.module; + +import org.apache.flink.table.factories.ModuleFactory; +import org.apache.flink.table.module.CommonModuleOptions; +import org.apache.flink.table.module.Module; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Test implementation of a {@link ModuleFactory} using the legacy stack. */ +@Deprecated +public class LegacyDummyModuleFactory implements ModuleFactory { + + public static final String IDENTIFIER = "LegacyModule"; + + @Override + public Map requiredContext() { + return Collections.singletonMap(CommonModuleOptions.MODULE_TYPE.key(), IDENTIFIER); + } + + @Override + public List supportedProperties() { + return Collections.emptyList(); + } + + @Override + public Module createModule(Map options) { + return new Module() {}; + } +} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 8a7f7f2bb594f..8f50817d8a0ac 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -22,3 +22,4 @@ org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory org.apache.flink.table.factories.TestConflictingDynamicTableFactory1 org.apache.flink.table.factories.TestConflictingDynamicTableFactory2 org.apache.flink.table.factories.TestCatalogFactory +org.apache.flink.table.factories.module.DummyModuleFactory diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 625488de1671b..8664f58ca68f4 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.factories.module.DummyModuleFactory org.apache.flink.table.factories.TestTableSinkFactory +org.apache.flink.table.factories.module.LegacyDummyModuleFactory diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index e2e06f1cb74a9..133c86fbb3e45 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -311,16 +311,16 @@ public void testAlterDatabase() throws Exception { public void testLoadModule() { final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')"; final String expectedModuleName = "dummy"; - final Map expectedProperties = new HashMap<>(); - expectedProperties.put("k1", "v1"); - expectedProperties.put("k2", "v2"); + final Map expectedOptions = new HashMap<>(); + expectedOptions.put("k1", "v1"); + expectedOptions.put("k2", "v2"); Operation operation = parse(sql, SqlDialect.DEFAULT); assert operation instanceof LoadModuleOperation; final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation; assertEquals(expectedModuleName, loadModuleOperation.getModuleName()); - assertEquals(expectedProperties, loadModuleOperation.getProperties()); + assertEquals(expectedOptions, loadModuleOperation.getOptions()); } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 7b9f0c85858fb..997eb0cfa5dfa 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -641,12 +641,12 @@ class TableEnvironmentTest { val statement = """ |LOAD MODULE dummy WITH ( - |'type' = 'dummy' + | 'type' = 'dummy' |) """.stripMargin expectedException.expect(classOf[ValidationException]) expectedException.expectMessage( - "Property 'type' = 'dummy' is not supported since module name is used to find module") + "Option 'type' = 'dummy' is not supported since module name is used to find module") tableEnv.executeSql(statement) } @@ -666,12 +666,12 @@ class TableEnvironmentTest { val statement2 = """ |LOAD MODULE dummy WITH ( - |'dummy-version' = '2' + | 'dummy-version' = '2' |) """.stripMargin expectedException.expect(classOf[ValidationException]) expectedException.expectMessage( - "Could not execute LOAD MODULE: (moduleName: [dummy], properties: [{dummy-version=2}])." + + "Could not execute LOAD MODULE `dummy` WITH ('dummy-version' = '2')." + " A module with name 'dummy' already exists") tableEnv.executeSql(statement2) } @@ -690,15 +690,15 @@ class TableEnvironmentTest { fail("Expected an exception") } catch { case t: Throwable => - assertThat(t, containsMessage("Could not execute LOAD MODULE: (moduleName: [Dummy], " + - "properties: [{dummy-version=1}]). Could not find a suitable table factory for " + - "'org.apache.flink.table.factories.ModuleFactory' in\nthe classpath.")) + assertThat(t, containsMessage( + "Could not execute LOAD MODULE `Dummy` WITH ('dummy-version' = '1')." + + " Unable to create module 'Dummy'.")) } val statement2 = """ |LOAD MODULE dummy WITH ( - |'dummy-version' = '2' + | 'dummy-version' = '2' |) """.stripMargin val result = tableEnv.executeSql(statement2) @@ -788,6 +788,12 @@ class TableEnvironmentTest { validateShowModules(("core", false)) } + @Test + def testLegacyModule(): Unit = { + tableEnv.executeSql("LOAD MODULE LegacyModule") + validateShowModules(("core", true), ("LegacyModule", true)) + } + @Test def testExecuteSqlWithCreateDropView(): Unit = { val createTableStmt =