diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 229f3090229fa..9bc7838d0bb94 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -45,7 +45,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -129,7 +129,7 @@ public HiveTableSink( this.catalogTable = table; hiveVersion = Preconditions.checkNotNull( - jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), + jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()), "Hive version is not defined"); hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema()); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 582c926248e54..7cbc5928ca07c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -37,7 +37,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -110,7 +110,7 @@ public HiveTableSource( this.catalogTable = Preconditions.checkNotNull(catalogTable); this.hiveVersion = Preconditions.checkNotNull( - jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), + jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()), "Hive version is not defined"); this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 38befdfa1aaa2..9cd75689d4833 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -60,7 +60,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.hive.util.HiveStatsUtil; @@ -203,7 +203,7 @@ protected HiveCatalog( hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); // add this to hiveConf to make sure table factory and source/sink see the same Hive version // as HiveCatalog - this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, hiveVersion); + this.hiveConf.set(HiveCatalogFactoryOptions.HIVE_VERSION.key(), hiveVersion); LOG.info("Created HiveCatalog '{}'", catalogName); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java deleted file mode 100644 index e6f947051fd98..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.hive.descriptors; - -import org.apache.flink.table.catalog.hive.HiveCatalog; -import org.apache.flink.table.descriptors.CatalogDescriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE; - -/** Catalog descriptor for {@link HiveCatalog}. */ -public class HiveCatalogDescriptor extends CatalogDescriptor { - - private String hiveSitePath; - private String hiveVersion; - - // TODO : set default database - public HiveCatalogDescriptor() { - super(CATALOG_TYPE_VALUE_HIVE, 1); - } - - public HiveCatalogDescriptor hiveSitePath(String hiveSitePath) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveSitePath)); - this.hiveSitePath = hiveSitePath; - - return this; - } - - public HiveCatalogDescriptor hiveVersion(String hiveVersion) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion)); - this.hiveVersion = hiveVersion; - return this; - } - - @Override - protected Map toCatalogProperties() { - final DescriptorProperties properties = new DescriptorProperties(); - - if (hiveSitePath != null) { - properties.putString(CATALOG_HIVE_CONF_DIR, hiveSitePath); - } - - if (hiveVersion != null) { - properties.putString(CATALOG_HIVE_VERSION, hiveVersion); - } - - return properties.asMap(); - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java deleted file mode 100644 index 75399c94686a2..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.hive.descriptors; - -import org.apache.flink.table.descriptors.CatalogDescriptorValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; - -/** Validator for {@link HiveCatalogDescriptor}. */ -public class HiveCatalogValidator extends CatalogDescriptorValidator { - public static final String CATALOG_TYPE_VALUE_HIVE = "hive"; - public static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir"; - public static final String CATALOG_HIVE_VERSION = "hive-version"; - public static final String CATALOG_HADOOP_CONF_DIR = "hadoop-conf-dir"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE, false); - properties.validateString(CATALOG_HIVE_CONF_DIR, true, 1); - properties.validateString(CATALOG_HIVE_VERSION, true, 1); - properties.validateString(CATALOG_HADOOP_CONF_DIR, true, 1); - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java index ace74eec0e159..c1dce0620bb80 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java @@ -18,92 +18,88 @@ package org.apache.flink.table.catalog.hive.factories; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.hive.HiveCatalog; -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.CatalogFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR; +import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; /** Catalog factory for {@link HiveCatalog}. */ public class HiveCatalogFactory implements CatalogFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class); @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE); // hive - context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility - return context; + public String factoryIdentifier() { + return HiveCatalogFactoryOptions.IDENTIFIER; } @Override - public List supportedProperties() { - List properties = new ArrayList<>(); - - // default database - properties.add(CATALOG_DEFAULT_DATABASE); - - properties.add(CATALOG_HIVE_CONF_DIR); - - properties.add(CATALOG_HIVE_VERSION); - - properties.add(CATALOG_HADOOP_CONF_DIR); - - return properties; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public Catalog createCatalog(String name, Map properties) { - final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - - final String defaultDatabase = - descriptorProperties - .getOptionalString(CATALOG_DEFAULT_DATABASE) - .orElse(HiveCatalog.DEFAULT_DB); - - final Optional hiveConfDir = - descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR); - - final Optional hadoopConfDir = - descriptorProperties.getOptionalString(CATALOG_HADOOP_CONF_DIR); + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + options.add(PROPERTY_VERSION); + options.add(HIVE_CONF_DIR); + options.add(HIVE_VERSION); + options.add(HADOOP_CONF_DIR); + return options; + } - final String version = - descriptorProperties - .getOptionalString(CATALOG_HIVE_VERSION) - .orElse(HiveShimLoader.getHiveVersion()); + @Override + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + validateConfiguration(configuration); return new HiveCatalog( - name, - defaultDatabase, - hiveConfDir.orElse(null), - hadoopConfDir.orElse(null), - version); + context.getName(), + configuration.getString(DEFAULT_DATABASE), + configuration.getString(HIVE_CONF_DIR), + configuration.getString(HADOOP_CONF_DIR), + configuration.getString(HIVE_VERSION)); } - private static DescriptorProperties getValidatedProperties(Map properties) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - new HiveCatalogValidator().validate(descriptorProperties); - - return descriptorProperties; + private void validateConfiguration(Configuration configuration) { + final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); + if (defaultDatabase != null && defaultDatabase.isEmpty()) { + throw new ValidationException( + String.format( + "Option '%s' was provided, but is empty", DEFAULT_DATABASE.key())); + } + + final String hiveConfDir = configuration.getString(HIVE_CONF_DIR); + if (hiveConfDir != null && hiveConfDir.isEmpty()) { + throw new ValidationException( + String.format("Option '%s' was provided, but is empty", HIVE_CONF_DIR.key())); + } + + final String hadoopConfDir = configuration.getString(HADOOP_CONF_DIR); + if (hadoopConfDir != null && hadoopConfDir.isEmpty()) { + throw new ValidationException( + String.format("Option '%s' was provided, but is empty", HADOOP_CONF_DIR.key())); + } + + final String hiveVersion = configuration.getString(HIVE_VERSION); + if (hiveVersion != null && hiveVersion.isEmpty()) { + throw new ValidationException( + String.format("Option '%s' was provided, but is empty", HIVE_VERSION.key())); + } } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java new file mode 100644 index 0000000000000..59348cf497a6f --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; + +/** {@link ConfigOption}s for {@link HiveCatalog}. */ +@Internal +public final class HiveCatalogFactoryOptions { + + public static final String IDENTIFIER = "hive"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(HiveCatalog.DEFAULT_DB); + + public static final ConfigOption HIVE_CONF_DIR = + ConfigOptions.key("hive-conf-dir").stringType().noDefaultValue(); + + public static final ConfigOption HIVE_VERSION = + ConfigOptions.key("hive-version") + .stringType() + .defaultValue(HiveShimLoader.getHiveVersion()); + + public static final ConfigOption HADOOP_CONF_DIR = + ConfigOptions.key("hadoop-conf-dir").stringType().noDefaultValue(); + + private HiveCatalogFactoryOptions() {} +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..27d69ee2ed0ed --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 0e5b4574d4e5b..5292f1062da66 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,6 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory org.apache.flink.table.module.hive.HiveModuleFactory org.apache.flink.table.planner.delegation.hive.HiveParserFactory diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java deleted file mode 100644 index e6f50a2f2b228..0000000000000 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.hive.descriptors; - -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorTestBase; -import org.apache.flink.table.descriptors.DescriptorValidator; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** Tests for {@link HiveCatalogDescriptor}. */ -public class HiveCatalogDescriptorTest extends DescriptorTestBase { - - @Override - protected List descriptors() { - final Descriptor descriptor = new HiveCatalogDescriptor(); - - return Arrays.asList(descriptor); - } - - @Override - protected List> properties() { - final Map props1 = new HashMap<>(); - props1.put("type", "hive"); - props1.put("property-version", "1"); - - return Arrays.asList(props1); - } - - @Override - protected DescriptorValidator validator() { - return new HiveCatalogValidator(); - } -} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java index 9314875a62100..c6ce2a337dd6b 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java @@ -19,12 +19,12 @@ package org.apache.flink.table.catalog.hive.factories; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogDescriptor; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.TestLogger; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,7 +45,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -66,14 +65,13 @@ public void testCreateHiveCatalog() { final HiveCatalog expectedCatalog = HiveTestUtils.createHiveCatalog(catalogName, null); - final HiveCatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor(); - catalogDescriptor.hiveSitePath(CONF_DIR.getPath()); - - final Map properties = catalogDescriptor.toProperties(); + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); + options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath()); final Catalog actualCatalog = - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog(catalogName, properties); + FactoryUtil.createCatalog( + catalogName, options, null, Thread.currentThread().getContextClassLoader()); assertEquals( "dummy-hms", @@ -97,15 +95,14 @@ public void testCreateHiveCatalogWithHadoopConfDir() throws IOException { HiveTestUtils.createHiveCatalog( catalogName, CONF_DIR.getPath(), hadoopConfDir, null); - final HiveCatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor(); - catalogDescriptor.hiveSitePath(CONF_DIR.getPath()); - - final Map properties = new HashMap<>(catalogDescriptor.toProperties()); - properties.put(CATALOG_HADOOP_CONF_DIR, hadoopConfDir); + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); + options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath()); + options.put(HiveCatalogFactoryOptions.HADOOP_CONF_DIR.key(), hadoopConfDir); final Catalog actualCatalog = - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog(catalogName, properties); + FactoryUtil.createCatalog( + catalogName, options, null, Thread.currentThread().getContextClassLoader()); checkEquals(expectedCatalog, (HiveCatalog) actualCatalog); assertEquals(mapredVal, ((HiveCatalog) actualCatalog).getHiveConf().get(mapredKey)); @@ -138,15 +135,21 @@ public void testLoadHadoopConfigFromEnv() throws IOException { CommonTestUtils.setEnv(newEnv); // create HiveCatalog use the Hadoop Configuration - final HiveCatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor(); - catalogDescriptor.hiveSitePath(CONF_DIR.getPath()); - final Map properties = catalogDescriptor.toProperties(); final HiveConf hiveConf; try { + final Map options = new HashMap<>(); + options.put( + CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); + options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath()); + final HiveCatalog hiveCatalog = (HiveCatalog) - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog(catalogName, properties); + FactoryUtil.createCatalog( + catalogName, + options, + null, + Thread.currentThread().getContextClassLoader()); + hiveConf = hiveCatalog.getHiveConf(); } finally { // set the Env back @@ -160,40 +163,50 @@ public void testLoadHadoopConfigFromEnv() throws IOException { @Test public void testDisallowEmbedded() { - expectedException.expect(IllegalArgumentException.class); - final Map properties = new HiveCatalogDescriptor().toProperties(); + expectedException.expect(ValidationException.class); + + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog("my_catalog", properties); + FactoryUtil.createCatalog( + "my_catalog", options, null, Thread.currentThread().getContextClassLoader()); } @Test public void testCreateMultipleHiveCatalog() throws Exception { - final HiveCatalogDescriptor descriptor1 = new HiveCatalogDescriptor(); - descriptor1.hiveSitePath( + final Map props1 = new HashMap<>(); + props1.put(CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); + props1.put( + HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), Thread.currentThread() .getContextClassLoader() .getResource("test-multi-hive-conf1") .getPath()); - Map props1 = descriptor1.toProperties(); - final HiveCatalogDescriptor descriptor2 = new HiveCatalogDescriptor(); - descriptor2.hiveSitePath( + final Map props2 = new HashMap<>(); + props2.put(CommonCatalogOptions.CATALOG_TYPE.key(), HiveCatalogFactoryOptions.IDENTIFIER); + props2.put( + HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), Thread.currentThread() .getContextClassLoader() .getResource("test-multi-hive-conf2") .getPath()); - Map props2 = descriptor2.toProperties(); Callable callable1 = () -> - TableFactoryService.find(CatalogFactory.class, props1) - .createCatalog("cat1", props1); + FactoryUtil.createCatalog( + "cat1", + props1, + null, + Thread.currentThread().getContextClassLoader()); Callable callable2 = () -> - TableFactoryService.find(CatalogFactory.class, props2) - .createCatalog("cat2", props2); + FactoryUtil.createCatalog( + "cat2", + props2, + null, + Thread.currentThread().getContextClassLoader()); ExecutorService executorService = Executors.newFixedThreadPool(2); Future future1 = executorService.submit(callable1); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java index 8238b98cdb4b5..3b361ad170f4d 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java @@ -18,27 +18,24 @@ package org.apache.flink.connector.jdbc.catalog.factory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.JdbcCatalogValidator; import org.apache.flink.table.factories.CatalogFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_BASE_URL; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_PASSWORD; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_USERNAME; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC; +import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL; +import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD; +import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; /** Factory for {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -46,45 +43,63 @@ public class JdbcCatalogFactory implements CatalogFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogFactory.class); @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC); // jdbc - context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility - return context; + public String factoryIdentifier() { + return JdbcCatalogFactoryOptions.IDENTIFIER; } @Override - public List supportedProperties() { - List properties = new ArrayList<>(); - - // default database - properties.add(CATALOG_DEFAULT_DATABASE); - - properties.add(CATALOG_JDBC_BASE_URL); - properties.add(CATALOG_JDBC_USERNAME); - properties.add(CATALOG_JDBC_PASSWORD); + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + options.add(USERNAME); + options.add(PASSWORD); + options.add(BASE_URL); + return options; + } - return properties; + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(PROPERTY_VERSION); + return options; } @Override - public Catalog createCatalog(String name, Map properties) { - final DescriptorProperties prop = getValidatedProperties(properties); + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + validateConfiguration(configuration); return new JdbcCatalog( - name, - prop.getString(CATALOG_DEFAULT_DATABASE), - prop.getString(CATALOG_JDBC_USERNAME), - prop.getString(CATALOG_JDBC_PASSWORD), - prop.getString(CATALOG_JDBC_BASE_URL)); + context.getName(), + configuration.getString(DEFAULT_DATABASE), + configuration.getString(USERNAME), + configuration.getString(PASSWORD), + configuration.getString(BASE_URL)); } - private static DescriptorProperties getValidatedProperties(Map properties) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - new JdbcCatalogValidator().validate(descriptorProperties); - - return descriptorProperties; + private void validateConfiguration(Configuration configuration) { + final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); + if (defaultDatabase == null || defaultDatabase.isEmpty()) { + throw new ValidationException( + String.format("Missing or empty value for '%s'", DEFAULT_DATABASE.key())); + } + + final String username = configuration.getString(USERNAME); + if (username == null || username.isEmpty()) { + throw new ValidationException( + String.format("Missing or empty value for '%s'", USERNAME.key())); + } + + final String password = configuration.getString(PASSWORD); + if (password == null || password.isEmpty()) { + throw new ValidationException( + String.format("Missing or empty value for '%s'", PASSWORD.key())); + } + + final String baseUrl = configuration.getString(BASE_URL); + if (baseUrl == null || baseUrl.isEmpty()) { + throw new ValidationException( + String.format("Missing or empty value for '%s'", BASE_URL.key())); + } } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java new file mode 100644 index 0000000000000..0cdde626409d2 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.catalog.factory; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +/** {@link ConfigOption}s for {@link JdbcCatalog}. */ +@Internal +public class JdbcCatalogFactoryOptions { + + public static final String IDENTIFIER = "jdbc"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .noDefaultValue(); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username").stringType().noDefaultValue(); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password").stringType().noDefaultValue(); + + public static final ConfigOption BASE_URL = + ConfigOptions.key("base-url").stringType().noDefaultValue(); + + private JdbcCatalogFactoryOptions() {} +} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java deleted file mode 100644 index 0033a2c50f266..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_BASE_URL; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_PASSWORD; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_USERNAME; -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC; -import static org.apache.flink.util.Preconditions.checkArgument; - -/** Descriptor for {@link org.apache.flink.connector.jdbc.catalog.JdbcCatalog}. */ -public class JdbcCatalogDescriptor extends CatalogDescriptor { - - private final String defaultDatabase; - private final String username; - private final String pwd; - private final String baseUrl; - - public JdbcCatalogDescriptor( - String defaultDatabase, String username, String pwd, String baseUrl) { - - super(CATALOG_TYPE_VALUE_JDBC, 1); - - checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase)); - checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); - checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); - checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); - - this.defaultDatabase = defaultDatabase; - this.username = username; - this.pwd = pwd; - this.baseUrl = baseUrl; - } - - @Override - protected Map toCatalogProperties() { - final DescriptorProperties properties = new DescriptorProperties(); - - properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase); - properties.putString(CATALOG_JDBC_USERNAME, username); - properties.putString(CATALOG_JDBC_PASSWORD, pwd); - properties.putString(CATALOG_JDBC_BASE_URL, baseUrl); - - return properties.asMap(); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java deleted file mode 100644 index ce48ca2ace52e..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; - -/** Validator for {@link JdbcCatalog}. */ -public class JdbcCatalogValidator extends CatalogDescriptorValidator { - - public static final String CATALOG_TYPE_VALUE_JDBC = "jdbc"; - - public static final String CATALOG_JDBC_USERNAME = "username"; - public static final String CATALOG_JDBC_PASSWORD = "password"; - public static final String CATALOG_JDBC_BASE_URL = "base-url"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC, false); - properties.validateString(CATALOG_JDBC_BASE_URL, false, 1); - properties.validateString(CATALOG_JDBC_USERNAME, false, 1); - properties.validateString(CATALOG_JDBC_PASSWORD, false, 1); - properties.validateString(CATALOG_DEFAULT_DATABASE, false, 1); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 86713a6e46ed8..a54d9f57f6721 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory +org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory diff --git a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index ecc4e8c380308..6892b6d738e7f 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,5 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java index 45380ee729587..bce5495ff16ce 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java @@ -21,10 +21,8 @@ import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; import org.apache.flink.connector.jdbc.catalog.PostgresCatalog; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.descriptors.CatalogDescriptor; -import org.apache.flink.table.descriptors.JdbcCatalogDescriptor; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.SingleInstancePostgresRule; @@ -33,6 +31,7 @@ import org.junit.Test; import java.sql.SQLException; +import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -67,15 +66,20 @@ public static void setup() throws SQLException { @Test public void test() { - final CatalogDescriptor catalogDescriptor = - new JdbcCatalogDescriptor( - PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl); - - final Map properties = catalogDescriptor.toProperties(); + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put( + JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), PostgresCatalog.DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), TEST_PWD); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); final Catalog actualCatalog = - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog(TEST_CATALOG_NAME, properties); + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); checkEquals(catalog, (JdbcCatalog) actualCatalog); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java deleted file mode 100644 index 078e9d1047e95..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC; - -/** Test for {@link JdbcCatalogDescriptor}. */ -public class JdbcCatalogDescriptorTest extends DescriptorTestBase { - - private static final String TEST_DB = "db"; - private static final String TEST_USERNAME = "user"; - private static final String TEST_PWD = "pwd"; - private static final String TEST_BASE_URL = "xxx"; - - @Override - protected List descriptors() { - final Descriptor descriptor = - new JdbcCatalogDescriptor(TEST_DB, TEST_USERNAME, TEST_PWD, TEST_BASE_URL); - - return Arrays.asList(descriptor); - } - - @Override - protected List> properties() { - final Map props = new HashMap<>(); - props.put("type", CATALOG_TYPE_VALUE_JDBC); - props.put("property-version", "1"); - props.put("default-database", TEST_DB); - props.put("username", TEST_USERNAME); - props.put("password", TEST_PWD); - props.put("base-url", TEST_BASE_URL); - - return Arrays.asList(props); - } - - @Override - protected DescriptorValidator validator() { - return new JdbcCatalogValidator(); - } -} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java index 99cf3ce0e58b0..693f97a1291cc 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java @@ -18,15 +18,14 @@ package org.apache.flink.table.client.config.entries; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.client.config.ConfigUtil; import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; import java.util.Map; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; - /** * Describes a catalog configuration entry. * @@ -51,8 +50,8 @@ public String getName() { @Override protected void validate(DescriptorProperties properties) { - properties.validateString(CATALOG_TYPE, false, 1); - properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0); + properties.validateString(CommonCatalogOptions.CATALOG_TYPE.key(), false, 1); + properties.validateInt(FactoryUtil.PROPERTY_VERSION.key(), true, 0); // further validation is performed by the discovered factory } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java index 79865a1740b9d..8c4fdfad935fc 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java @@ -20,6 +20,7 @@ import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.python.PythonFunctionFactory; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.PipelineOptions; @@ -63,7 +64,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -493,20 +493,23 @@ private void verifyUserClassLoader() { public static class TestClassLoaderCatalogFactory implements CatalogFactory { @Override - public Catalog createCatalog(String name, Map properties) { - return new TestClassLoaderCatalog("test_cl"); + public String factoryIdentifier() { + return "test_cl_catalog"; } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put("type", "test_cl_catalog"); - return context; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public List supportedProperties() { - return Collections.emptyList(); + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public Catalog createCatalog(Context context) { + return new TestClassLoaderCatalog("test_cl"); } } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 16134885151f2..df582d3d6f79d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -19,6 +19,8 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; @@ -28,6 +30,7 @@ import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -38,15 +41,14 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.context.DefaultContext; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase; import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase; import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.module.Module; @@ -63,14 +65,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -205,29 +206,32 @@ public static class TestModule implements Module {} /** Catalog that can be discovered if classloading is correct. */ public static class TestCatalogFactory implements CatalogFactory { + private static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + @Override - public Map requiredContext() { - final Map context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_TEST); - return context; + public String factoryIdentifier() { + return CATALOG_TYPE_TEST; } @Override - public List supportedProperties() { - final List properties = new ArrayList<>(); - properties.add(CATALOG_DEFAULT_DATABASE); - return properties; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public Catalog createCatalog(String name, Map properties) { - final DescriptorProperties params = new DescriptorProperties(true); - params.putProperties(properties); - - final Optional defaultDatabase = - params.getOptionalString(CATALOG_DEFAULT_DATABASE); + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + return options; + } - return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB)); + @Override + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + return new TestCatalog(context.getName(), configuration.getString(DEFAULT_DATABASE)); } } @@ -249,32 +253,22 @@ public static class TestHiveCatalogFactory extends HiveCatalogFactory { static final String TABLE_WITH_PARAMETERIZED_TYPES = "param_types_table"; @Override - public Map requiredContext() { - Map context = super.requiredContext(); - - // For factory discovery service to distinguish TestHiveCatalogFactory from - // HiveCatalogFactory - context.put("test", "test"); - return context; + public String factoryIdentifier() { + return "hive-test"; } @Override - public List supportedProperties() { - List list = super.supportedProperties(); - list.add(CatalogPropertiesUtil.IS_GENERIC); - - return list; - } + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); - @Override - public Catalog createCatalog(String name, Map properties) { // Developers may already have their own production/testing hive-site.xml set in their // environment, // and Flink tests should avoid using those hive-site.xml. // Thus, explicitly create a testing HiveConf for unit tests here Catalog hiveCatalog = HiveTestUtils.createHiveCatalog( - name, properties.get(HiveCatalogValidator.CATALOG_HIVE_VERSION)); + context.getName(), + configuration.getString(HiveCatalogFactoryOptions.HIVE_VERSION)); // Creates an additional database to test tableEnv.useDatabase() will switch current // database of the catalog diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index cf59d27324edc..ea5ec578ba9e0 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.client.gateway.local; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; @@ -35,7 +36,6 @@ import static org.apache.flink.table.client.config.entries.CatalogEntry.CATALOG_NAME; import static org.apache.flink.table.client.config.entries.ModuleEntry.MODULE_NAME; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -130,7 +130,7 @@ private static Map createCatalog(String name, String type) { Map prop = new HashMap<>(); prop.put(CATALOG_NAME, name); - prop.put(CATALOG_TYPE, type); + prop.put(CommonCatalogOptions.CATALOG_TYPE.key(), type); return prop; } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java index ffad5aee91915..e24979c34e8ed 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java @@ -20,16 +20,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.descriptors.CatalogDescriptorValidator; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.types.DataType; @@ -37,9 +40,10 @@ import org.apache.flink.util.WrappingRuntimeException; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; /** * Catalog factory for an in-memory catalog that contains a single non-empty table. The contents of @@ -47,22 +51,46 @@ */ public class SimpleCatalogFactory implements CatalogFactory { - public static final String CATALOG_TYPE_VALUE = "simple-catalog"; - - public static final String TEST_TABLE_NAME = "test-table"; + public static final String IDENTIFIER = "simple-catalog"; public static final List TABLE_CONTENTS = Arrays.asList( Row.of(1, "Hello"), Row.of(2, "Hello world"), Row.of(3, "Hello world! Hello!")); + private static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue("default_database"); + + private static final ConfigOption TABLE_NAME = + ConfigOptions.key("test-table").stringType().defaultValue("test-table"); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + @Override - public Catalog createCatalog(String name, Map properties) { - String database = - properties.getOrDefault( - CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, "default_database"); - GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database); + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + options.add(TABLE_NAME); + return options; + } + + @Override + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + final String database = configuration.getString(DEFAULT_DATABASE); + final String tableName = configuration.getString(TABLE_NAME); - String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME); + final GenericInMemoryCatalog genericInMemoryCatalog = + new GenericInMemoryCatalog(context.getName(), database); StreamTableSource tableSource = new StreamTableSource() { @Override @@ -102,16 +130,4 @@ public DataType getProducedDataType() { return genericInMemoryCatalog; } - - @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(CatalogDescriptorValidator.CATALOG_TYPE, CATALOG_TYPE_VALUE); - return context; - } - - @Override - public List supportedProperties() { - return Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, TEST_TABLE_NAME); - } } diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..d033f0af09f45 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory +org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory +org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory +org.apache.flink.table.client.gateway.context.ExecutionContextTest$TestClassLoaderCatalogFactory diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index a0d1c61fcfcbf..2c2b86bb26dbe 100644 --- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -15,8 +15,4 @@ org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory -org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory -org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory -org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory -org.apache.flink.table.client.gateway.context.ExecutionContextTest$TestClassLoaderCatalogFactory diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index 633e5c1bdb042..1d291c59c43f0 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -191,8 +191,7 @@ drop catalog `mod`; # ========================================================================== create catalog hivecatalog with ( - 'type' = 'hive', - 'test' = 'test', -- this makes sure we use TestHiveCatalogFactory + 'type' = 'hive-test', 'hive-version' = '2.3.4' ); [INFO] Catalog has been created. diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q index fea362b83b84a..ddbf1ba1589c8 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/function.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q @@ -205,7 +205,7 @@ org.apache.flink.table.api.ValidationException: Alter temporary catalog function # test function with hive catalog # ========================================================================== -create catalog hivecatalog with ('type'='hive', 'hive-version'='2.3.4','test'='test'); +create catalog hivecatalog with ('type'='hive-test', 'hive-version'='2.3.4'); [INFO] Catalog has been created. !info diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml index c88d06b208d22..51eb80f63385c 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml @@ -150,9 +150,7 @@ catalogs: type: generic_in_memory default-database: mydatabase - name: hivecatalog - type: hive - test: test + type: hive-test hive-version: 2.3.4 - name: hivedefaultversion - type: hive - test: test + type: hive-test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java index 27c7af51a961e..6ee2c1427b748 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java @@ -18,59 +18,47 @@ package org.apache.flink.table.catalog; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.CatalogFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; -import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY; +import static org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; /** Catalog factory for {@link GenericInMemoryCatalog}. */ public class GenericInMemoryCatalogFactory implements CatalogFactory { @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY); // generic_in_memory - context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility - return context; + public String factoryIdentifier() { + return GenericInMemoryCatalogFactoryOptions.IDENTIFIER; } @Override - public List supportedProperties() { - List properties = new ArrayList<>(); - - // default database - properties.add(CATALOG_DEFAULT_DATABASE); - - return properties; + public Set> requiredOptions() { + return Collections.emptySet(); } @Override - public Catalog createCatalog(String name, Map properties) { - final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - - final Optional defaultDatabase = - descriptorProperties.getOptionalString(CATALOG_DEFAULT_DATABASE); - - return new GenericInMemoryCatalog( - name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB)); + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + options.add(PROPERTY_VERSION); + return options; } - private static DescriptorProperties getValidatedProperties(Map properties) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - new GenericInMemoryCatalogValidator().validate(descriptorProperties); + @Override + public Catalog createCatalog(Context context) { + final Configuration configuration = Configuration.fromMap(context.getOptions()); + final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); + if (defaultDatabase == null || defaultDatabase.isEmpty()) { + throw new ValidationException("The default database must not be empty"); + } - return descriptorProperties; + return new GenericInMemoryCatalog(context.getName(), defaultDatabase); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java similarity index 54% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java index 61a5a1a70e184..f8fd4f67a21bd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java @@ -16,15 +16,22 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.catalog; -/** Validator for {@link GenericInMemoryCatalogDescriptor}. */ -public class GenericInMemoryCatalogValidator extends CatalogDescriptorValidator { - public static final String CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY = "generic_in_memory"; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, false); - } +/** {@link ConfigOption}s for {@link GenericInMemoryCatalog}. */ +@Internal +public class GenericInMemoryCatalogFactoryOptions { + + public static final String IDENTIFIER = "generic_in_memory"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(GenericInMemoryCatalog.DEFAULT_DB); + + private GenericInMemoryCatalogFactoryOptions() {} } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java deleted file mode 100644 index 7970a916b72b7..0000000000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY; - -/** Catalog descriptor for the generic in memory catalog. */ -public class GenericInMemoryCatalogDescriptor extends CatalogDescriptor { - - public GenericInMemoryCatalogDescriptor() { - super(CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, 1); - } - - public GenericInMemoryCatalogDescriptor(String defaultDatabase) { - super(CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, 1, defaultDatabase); - } - - @Override - protected Map toCatalogProperties() { - return Collections.unmodifiableMap(new HashMap<>()); - } -} diff --git a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory rename to flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java index bfca122d90c71..1184936cd0964 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java @@ -18,14 +18,12 @@ package org.apache.flink.table.catalog; -import org.apache.flink.table.descriptors.CatalogDescriptor; -import org.apache.flink.table.descriptors.GenericInMemoryCatalogDescriptor; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -41,14 +39,15 @@ public void test() throws Exception { final GenericInMemoryCatalog expectedCatalog = new GenericInMemoryCatalog(catalogName, databaseName); - final CatalogDescriptor catalogDescriptor = - new GenericInMemoryCatalogDescriptor(databaseName); - - final Map properties = catalogDescriptor.toProperties(); + final Map options = new HashMap<>(); + options.put( + CommonCatalogOptions.CATALOG_TYPE.key(), + GenericInMemoryCatalogFactoryOptions.IDENTIFIER); + options.put(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE.key(), databaseName); final Catalog actualCatalog = - TableFactoryService.find(CatalogFactory.class, properties) - .createCatalog(catalogName, properties); + FactoryUtil.createCatalog( + catalogName, options, null, Thread.currentThread().getContextClassLoader()); checkEquals(expectedCatalog, (GenericInMemoryCatalog) actualCatalog); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java deleted file mode 100644 index a71c3636eb886..0000000000000 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptor; - -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorTestBase; -import org.apache.flink.table.descriptors.DescriptorValidator; -import org.apache.flink.table.descriptors.GenericInMemoryCatalogDescriptor; -import org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** Tests for the {@link GenericInMemoryCatalogDescriptor} descriptor. */ -public class GenericInMemoryCatalogDescriptorTest extends DescriptorTestBase { - - private static final String TEST_DATABASE = "test"; - - @Override - protected List descriptors() { - final Descriptor withoutDefaultDB = new GenericInMemoryCatalogDescriptor(); - - final Descriptor withDefaultDB = new GenericInMemoryCatalogDescriptor(TEST_DATABASE); - - return Arrays.asList(withoutDefaultDB, withDefaultDB); - } - - @Override - protected List> properties() { - final Map props1 = new HashMap<>(); - props1.put("type", "generic_in_memory"); - props1.put("property-version", "1"); - - final Map props2 = new HashMap<>(); - props2.put("type", "generic_in_memory"); - props2.put("property-version", "1"); - props2.put("default-database", TEST_DATABASE); - - return Arrays.asList(props1, props2); - } - - @Override - protected DescriptorValidator validator() { - return new GenericInMemoryCatalogValidator(); - } -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java new file mode 100644 index 0000000000000..8641ff910419d --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.Factory; + +/** A collection of {@link ConfigOption} which are consistently used in multiple catalogs. */ +@Internal +public class CommonCatalogOptions { + + /** + * Key used for specifying a default database {@link ConfigOption}. + * + *

Note that we cannot expose an actual instance of {@link ConfigOption} here as the default + * values differ between catalogs. + */ + public static final String DEFAULT_DATABASE_KEY = "default-database"; + + /** + * {@link ConfigOption} which is used during catalog discovery to match it against {@link + * Factory#factoryIdentifier()}. + */ + public static final ConfigOption CATALOG_TYPE = + ConfigOptions.key("type").stringType().noDefaultValue(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java deleted file mode 100644 index 112c70eef9aa4..0000000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; -import static org.apache.flink.util.Preconditions.checkArgument; - -/** Describes a catalog of tables, views, and functions. */ -@PublicEvolving -public abstract class CatalogDescriptor extends DescriptorBase { - - private final String type; - - private final int propertyVersion; - - private final String defaultDatabase; - - /** - * Constructs a {@link CatalogDescriptor}. - * - * @param type string that identifies this catalog - * @param propertyVersion property version for backwards compatibility - */ - public CatalogDescriptor(String type, int propertyVersion) { - this(type, propertyVersion, null); - } - - /** - * Constructs a {@link CatalogDescriptor}. - * - * @param type string that identifies this catalog - * @param propertyVersion property version for backwards compatibility - * @param defaultDatabase default database of the catalog - */ - public CatalogDescriptor(String type, int propertyVersion, String defaultDatabase) { - checkArgument(!StringUtils.isNullOrWhitespaceOnly(type), "type cannot be null or empty"); - - this.type = type; - this.propertyVersion = propertyVersion; - this.defaultDatabase = defaultDatabase; - } - - @Override - public final Map toProperties() { - final DescriptorProperties properties = new DescriptorProperties(); - properties.putString(CATALOG_TYPE, type); - properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion); - - if (defaultDatabase != null) { - properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase); - } - - properties.putProperties(toCatalogProperties()); - return properties.asMap(); - } - - public String getDefaultDatabase() { - return defaultDatabase; - } - - /** Converts this descriptor into a set of catalog properties. */ - protected abstract Map toCatalogProperties(); -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java deleted file mode 100644 index 384c7bcdd52c1..0000000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.Internal; - -/** Validator for {@link CatalogDescriptor}. */ -@Internal -public abstract class CatalogDescriptorValidator implements DescriptorValidator { - - /** Key for describing the type of the catalog. Usually used for factory discovery.ca */ - public static final String CATALOG_TYPE = "type"; - - /** - * Key for describing the property version. This property can be used for backwards - * compatibility in case the property format changes. - */ - public static final String CATALOG_PROPERTY_VERSION = "property-version"; - - /** Key for describing the default database of the catalog. */ - public static final String CATALOG_DEFAULT_DATABASE = "default-database"; - - @Override - public void validate(DescriptorProperties properties) { - properties.validateString(CATALOG_TYPE, false, 1); - properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0); - properties.validateString(CATALOG_DEFAULT_DATABASE, true, 1); - } -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index c78d0861c0256..413193d6e3497 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -29,6 +29,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.format.DecodingFormat; @@ -60,6 +61,10 @@ public final class FactoryUtil { private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + /** + * Describes the property version. This can be used for backwards compatibility in case the + * property format changes. + */ public static final ConfigOption PROPERTY_VERSION = ConfigOptions.key("property-version") .intType() @@ -75,13 +80,6 @@ public final class FactoryUtil { "Uniquely identifies the connector of a dynamic table that is used for accessing data in " + "an external system. Its value is used during table source and table sink discovery."); - public static final ConfigOption CATALOG_TYPE = - ConfigOptions.key("type") - .stringType() - .noDefaultValue() - .withDescription( - "Uniquely identifies the catalog. Its value is used during catalog discovery."); - public static final ConfigOption FORMAT = ConfigOptions.key("format") .stringType() @@ -239,7 +237,11 @@ public static Catalog createCatalog( // to the catalog factory itself. final Map factoryOptions = options.entrySet().stream() - .filter(entry -> !CATALOG_TYPE.key().equals(entry.getKey())) + .filter( + entry -> + !CommonCatalogOptions.CATALOG_TYPE + .key() + .equals(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); final DefaultCatalogContext context = new DefaultCatalogContext( @@ -258,7 +260,8 @@ public static Catalog createCatalog( optionEntry.getKey(), optionEntry.getValue())) .sorted() - .collect(Collectors.joining("\n")))); + .collect(Collectors.joining("\n"))), + t); } } } @@ -420,12 +423,13 @@ private static T getDynamicTableFactory( } private static CatalogFactory getCatalogFactory(CatalogFactory.Context context) { - final String catalogType = context.getOptions().get(CATALOG_TYPE.key()); + final String catalogType = + context.getOptions().get(CommonCatalogOptions.CATALOG_TYPE.key()); if (catalogType == null) { throw new ValidationException( String.format( "Catalog options do not contain an option key '%s' for discovering a catalog.", - CATALOG_TYPE.key())); + CommonCatalogOptions.CATALOG_TYPE.key())); } return discoverFactory(context.getClassLoader(), CatalogFactory.class, catalogType); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java index 51ea695520ccd..9bfae3ea47020 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java @@ -42,7 +42,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION; @@ -240,7 +239,7 @@ private static List filterByContext( // with the version we can provide mappings in case the format changes plainContext.remove(CONNECTOR_PROPERTY_VERSION); plainContext.remove(FORMAT_PROPERTY_VERSION); - plainContext.remove(CATALOG_PROPERTY_VERSION); + plainContext.remove(FactoryUtil.PROPERTY_VERSION.key()); // check if required context is met Map> mismatchedProperties = new HashMap<>(); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java deleted file mode 100644 index 1f7ff63444cbf..0000000000000 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.table.api.ValidationException; - -import org.junit.Test; - -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; - -/** - * Tests for the {@link CatalogDescriptor} descriptor and {@link CatalogDescriptorValidator} - * validator. - */ -public class CatalogDescriptorTest extends DescriptorTestBase { - - private static final String CATALOG_TYPE_VALUE = "CatalogDescriptorTest"; - private static final int CATALOG_PROPERTY_VERSION_VALUE = 1; - private static final String CATALOG_FOO = "foo"; - private static final String CATALOG_FOO_VALUE = "foo-1"; - - @Test(expected = ValidationException.class) - public void testMissingCatalogType() { - removePropertyAndVerify(descriptors().get(0), CATALOG_TYPE); - } - - @Test(expected = ValidationException.class) - public void testMissingFoo() { - removePropertyAndVerify(descriptors().get(0), CATALOG_FOO); - } - - @Override - protected List descriptors() { - final Descriptor minimumDesc = new TestCatalogDescriptor(CATALOG_FOO_VALUE); - return Collections.singletonList(minimumDesc); - } - - @Override - protected List> properties() { - final Map minimumProps = new HashMap<>(); - minimumProps.put(CATALOG_TYPE, CATALOG_TYPE_VALUE); - minimumProps.put(CATALOG_PROPERTY_VERSION, "" + CATALOG_PROPERTY_VERSION_VALUE); - minimumProps.put(CATALOG_FOO, CATALOG_FOO_VALUE); - return Collections.singletonList(minimumProps); - } - - @Override - protected DescriptorValidator validator() { - return new TestCatalogDescriptorValidator(); - } - - /** CatalogDescriptor for test. */ - private class TestCatalogDescriptor extends CatalogDescriptor { - private String foo; - - public TestCatalogDescriptor(@Nullable String foo) { - super(CATALOG_TYPE_VALUE, CATALOG_PROPERTY_VERSION_VALUE); - this.foo = foo; - } - - @Override - protected Map toCatalogProperties() { - DescriptorProperties properties = new DescriptorProperties(); - if (foo != null) { - properties.putString(CATALOG_FOO, foo); - } - return properties.asMap(); - } - } - - /** CatalogDescriptorValidator for test. */ - private class TestCatalogDescriptorValidator extends CatalogDescriptorValidator { - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateString(CATALOG_FOO, false, 1); - } - } -} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index bb478f68ed594..7b04ea05b3b1a 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -20,8 +20,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock; @@ -46,6 +48,7 @@ import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** Tests for {@link FactoryUtil}. */ @@ -273,6 +276,27 @@ public void testConnectorErrorHint() { } } + @Test + public void testCreateCatalog() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), TestCatalogFactory.IDENTIFIER); + options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), "my-database"); + + final Catalog catalog = + FactoryUtil.createCatalog( + "my-catalog", + options, + null, + Thread.currentThread().getContextClassLoader()); + assertTrue(catalog instanceof TestCatalogFactory.TestCatalog); + + final TestCatalogFactory.TestCatalog testCatalog = (TestCatalogFactory.TestCatalog) catalog; + assertEquals(testCatalog.getName(), "my-catalog"); + assertEquals( + testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()), + "my-database"); + } + // -------------------------------------------------------------------------------------------- private void expectError(String message) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java new file mode 100644 index 0000000000000..ee9cad594b0a3 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** Test catalog factory for catalog discovery tests. */ +public class TestCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "test-catalog"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .noDefaultValue(); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + return options; + } + + @Override + public Catalog createCatalog(Context context) { + return new TestCatalog(context.getName(), context.getOptions()); + } + + /** Test catalog for discovery testing. */ + public static class TestCatalog implements Catalog { + private final String name; + private final Map options; + + public TestCatalog(String name, Map options) { + this.name = name; + this.options = options; + } + + public String getName() { + return name; + } + + public Map getOptions() { + return options; + } + + @Override + public void open() throws CatalogException {} + + @Override + public void close() throws CatalogException {} + + @Override + public String getDefaultDatabase() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listDatabases() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase( + String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable( + ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable( + ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogPartition getPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, + CatalogTableStatistics tableStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 9c66d01e91d0c..8a7f7f2bb594f 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -21,3 +21,4 @@ org.apache.flink.table.factories.TestDynamicTableSinkOnlyFactory org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory org.apache.flink.table.factories.TestConflictingDynamicTableFactory1 org.apache.flink.table.factories.TestConflictingDynamicTableFactory2 +org.apache.flink.table.factories.TestCatalogFactory diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java index faeb218a285f3..0426985724177 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions; import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -32,7 +33,6 @@ import java.net.URLClassLoader; -import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -48,7 +48,7 @@ public void testCreateCatalog() { String ddl = String.format( "create catalog %s with('type'='%s')", - name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY); + name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER); tableEnv.executeSql(ddl); @@ -64,7 +64,7 @@ public void testDropCatalog() { String ddl = String.format( "create catalog %s with('type'='%s')", - name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY); + name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER); tableEnv.executeSql(ddl); assertTrue(tableEnv.getCatalog(name).isPresent()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java index d2dd64ca50729..98a775cf366a3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java @@ -19,22 +19,24 @@ package org.apache.flink.table.factories; import org.apache.flink.table.api.NoMatchingTableFactoryException; -import org.apache.flink.table.factories.utils.TestCatalogFactory; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.utils.TestLegacyCatalogFactory; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; -import static org.apache.flink.table.factories.utils.TestCatalogFactory.CATALOG_TYPE_TEST; +import static org.apache.flink.table.factories.utils.TestLegacyCatalogFactory.CATALOG_TYPE_TEST_LEGACY; import static org.junit.Assert.assertEquals; /** * Tests for testing external catalog discovery using {@link TableFactoryService}. The tests assume * the catalog factory {@link CatalogFactory} is registered. + * + * @deprecated These tests are for the legacy factory stack */ +@Deprecated public class CatalogFactoryServiceTest { @Test public void testValidProperties() { @@ -42,25 +44,25 @@ public void testValidProperties() { assertEquals( TableFactoryService.find(CatalogFactory.class, props).getClass(), - TestCatalogFactory.class); + TestLegacyCatalogFactory.class); } @Test(expected = NoMatchingTableFactoryException.class) public void testInvalidContext() { Map props = properties(); - props.put(CATALOG_TYPE, "unknown-catalog-type"); + props.put(CommonCatalogOptions.CATALOG_TYPE.key(), "unknown-catalog-type"); TableFactoryService.find(CatalogFactory.class, props); } @Test public void testDifferentContextVersion() { Map props = properties(); - props.put(CATALOG_PROPERTY_VERSION, "2"); + props.put(FactoryUtil.PROPERTY_VERSION.key(), "2"); // the catalog should still be found assertEquals( TableFactoryService.find(CatalogFactory.class, props).getClass(), - TestCatalogFactory.class); + TestLegacyCatalogFactory.class); } @Test(expected = NoMatchingTableFactoryException.class) @@ -73,8 +75,8 @@ public void testUnsupportedProperty() { private Map properties() { Map properties = new HashMap<>(); - properties.put(CATALOG_TYPE, CATALOG_TYPE_TEST); - properties.put(CATALOG_PROPERTY_VERSION, "1"); + properties.put(CommonCatalogOptions.CATALOG_TYPE.key(), CATALOG_TYPE_TEST_LEGACY); + properties.put(FactoryUtil.PROPERTY_VERSION.key(), "1"); return properties; } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java similarity index 77% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java index fec1de3c3fcb5..786c6abd0d8d1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java @@ -19,33 +19,26 @@ package org.apache.flink.table.factories.utils; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; +/** Legacy catalog factory for testing. */ +public class TestLegacyCatalogFactory implements CatalogFactory { -/** Catalog factory for testing. */ -public class TestCatalogFactory implements CatalogFactory { - - public static final String CATALOG_TYPE_TEST = "test"; - - @Override - public Catalog createCatalog(String name, Map properties) { - return new GenericInMemoryCatalog(name); - } + public static final String CATALOG_TYPE_TEST_LEGACY = "test-legacy"; @Override public Map requiredContext() { Map context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_TEST); - context.put(CATALOG_PROPERTY_VERSION, "1"); - + context.put(CommonCatalogOptions.CATALOG_TYPE.key(), CATALOG_TYPE_TEST_LEGACY); + context.put(FactoryUtil.PROPERTY_VERSION.key(), "1"); return context; } @@ -53,4 +46,9 @@ public Map requiredContext() { public List supportedProperties() { return Collections.emptyList(); } + + @Override + public Catalog createCatalog(String name, Map properties) { + return new GenericInMemoryCatalog(name); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index d9aab281faf8f..17edf0790643f 100644 --- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -19,5 +19,5 @@ org.apache.flink.table.factories.utils.TestTableSourceFactory org.apache.flink.table.factories.utils.TestTableFormatFactory org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory org.apache.flink.table.catalog.TestExternalTableSourceFactory -org.apache.flink.table.factories.utils.TestCatalogFactory org.apache.flink.table.factories.utils.TestCollectionTableFactory +org.apache.flink.table.factories.utils.TestLegacyCatalogFactory