diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 21d64c1e48ed7..bb3572da87dbd 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -35,11 +35,6 @@ under the License. jar - - 2.3.4 - 2.7.2 - - @@ -73,6 +68,36 @@ under the License. hadoop-common ${hivemetastore.hadoop.version} provided + + + com.google.guava + guava + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.avro + avro + + + commons-digester + commons-digester + + + com.sun.jersey + jersey-json + + + org.codehaus.jackson + jackson-mapper-asl + + @@ -80,6 +105,24 @@ under the License. hadoop-mapreduce-client-core ${hivemetastore.hadoop.version} provided + + + org.apache.curator + curator-test + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.avro + avro + + + org.codehaus.jackson + jackson-core-asl + + @@ -108,10 +151,6 @@ under the License. com.google.protobuf protobuf-java - - org.apache.derby - derby - org.apache.hbase hbase-client @@ -338,13 +377,6 @@ under the License. - - org.apache.derby - derby - 10.10.2.0 - test - - org.apache.flink flink-table-common diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 3cd67b01c76ae..2068f683bb1df 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -79,8 +79,6 @@ import javax.annotation.Nullable; -import java.io.File; -import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; @@ -95,8 +93,10 @@ * A catalog implementation for Hive. */ public class HiveCatalog extends AbstractCatalog { + // Default database of Hive metastore + public static final String DEFAULT_DB = "default"; + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); - private static final String DEFAULT_DB = "default"; private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile"; @@ -114,46 +114,22 @@ public class HiveCatalog extends AbstractCatalog { private HiveMetastoreClientWrapper client; - public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveSiteFilePath) { - this(catalogName, - defaultDatabase == null ? DEFAULT_DB : defaultDatabase, - getHiveConf(loadHiveSiteUrl(hiveSiteFilePath))); - } - public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveSiteUrl) { this(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase, - getHiveConf(hiveSiteUrl)); + createHiveConf(hiveSiteUrl)); } - public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable HiveConf hiveConf) { + @VisibleForTesting + protected HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase); - this.hiveConf = hiveConf == null ? getHiveConf(null) : hiveConf; + this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf; LOG.info("Created HiveCatalog '{}'", catalogName); } - private static URL loadHiveSiteUrl(String filePath) { - - URL url = null; - - if (!StringUtils.isNullOrWhitespaceOnly(filePath)) { - try { - url = new File(filePath).toURI().toURL(); - - LOG.info("Successfully loaded '{}'", filePath); - - } catch (MalformedURLException e) { - throw new CatalogException( - String.format("Failed to get hive-site.xml from the given path '%s'", filePath), e); - } - } - - return url; - } - - private static HiveConf getHiveConf(URL hiveSiteUrl) { + private static HiveConf createHiveConf(URL hiveSiteUrl) { LOG.info("Setting hive-site location as {}", hiveSiteUrl); HiveConf.setHiveSiteLocation(hiveSiteUrl); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java new file mode 100644 index 0000000000000..07e6ba0c10415 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.descriptors; + +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.descriptors.CatalogDescriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import java.util.Map; + +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_SITE_PATH; +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE; + +/** + * Catalog descriptor for {@link HiveCatalog}. + */ +public class HiveCatalogDescriptor extends CatalogDescriptor { + + private String hiveSitePath; + + // TODO : set default database + public HiveCatalogDescriptor() { + super(CATALOG_TYPE_VALUE_HIVE, 1); + } + + public HiveCatalogDescriptor hiveSitePath(String hiveSitePath) { + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveSitePath)); + this.hiveSitePath = hiveSitePath; + + return this; + } + + @Override + protected Map toCatalogProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + + if (hiveSitePath != null) { + properties.putString(CATALOG_HIVE_SITE_PATH, hiveSitePath); + } + + return properties.asMap(); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java new file mode 100644 index 0000000000000..7f8ad6c4e44f3 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.descriptors; + +import org.apache.flink.table.descriptors.CatalogDescriptorValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; + +/** + * Validator for {@link HiveCatalogDescriptor}. + */ +public class HiveCatalogValidator extends CatalogDescriptorValidator { + public static final String CATALOG_TYPE_VALUE_HIVE = "hive"; + public static final String CATALOG_HIVE_SITE_PATH = "hive-site-path"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE, false); + properties.validateString(CATALOG_HIVE_SITE_PATH, true, 1); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java new file mode 100644 index 0000000000000..59e2df1c32b3e --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.factories; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_SITE_PATH; +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE; +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; + +/** + * Catalog factory for {@link HiveCatalog}. + */ +public class HiveCatalogFactory implements CatalogFactory { + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class); + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE); // hive + context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // default database + properties.add(CATALOG_DEFAULT_DATABASE); + + properties.add(CATALOG_HIVE_SITE_PATH); + + return properties; + } + + @Override + public Catalog createCatalog(String name, Map properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + + final String defaultDatabase = + descriptorProperties.getOptionalString(CATALOG_DEFAULT_DATABASE) + .orElse(HiveCatalog.DEFAULT_DB); + + final Optional hiveSitePath = descriptorProperties.getOptionalString(CATALOG_HIVE_SITE_PATH); + + return new HiveCatalog(name, defaultDatabase, loadHiveSiteUrl(hiveSitePath.orElse(null))); + } + + private static URL loadHiveSiteUrl(String filePath) { + + URL url = null; + + if (!StringUtils.isNullOrWhitespaceOnly(filePath)) { + try { + url = new File(filePath).toURI().toURL(); + + LOG.info("Successfully loaded '{}'", filePath); + + } catch (MalformedURLException e) { + throw new CatalogException( + String.format("Failed to get hive-site.xml from the given path '%s'", filePath), e); + } + } + + return url; + } + + private static DescriptorProperties getValidatedProperties(Map properties) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + new HiveCatalogValidator().validate(descriptorProperties); + + return descriptorProperties; + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 0000000000000..27d69ee2ed0ed --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java index 80ff6debd26a2..1f9aaa70ef591 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java @@ -60,7 +60,7 @@ public class HiveInputFormatTest { @BeforeClass public static void createCatalog() throws IOException { - hiveConf = HiveTestUtils.getHiveConf(); + hiveConf = HiveTestUtils.createHiveConf(); hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); hiveCatalog.open(); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java index 5754d59270b13..e6d79bdc4ef74 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java @@ -60,7 +60,7 @@ public class HiveTableOutputFormatTest { @BeforeClass public static void createCatalog() throws IOException { - hiveConf = HiveTestUtils.getHiveConf(); + hiveConf = HiveTestUtils.createHiveConf(); hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); hiveCatalog.open(); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index f621c6cd6182f..993ba2d69c0cf 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.table.catalog.CatalogTestBase; +import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.hadoop.hive.conf.HiveConf; import org.junit.rules.TemporaryFolder; @@ -36,25 +37,34 @@ public class HiveTestUtils { /** * Create a HiveCatalog with an embedded Hive Metastore. */ - public static HiveCatalog createHiveCatalog() throws IOException { - return new HiveCatalog(CatalogTestBase.TEST_CATALOG_NAME, null, getHiveConf()); + public static HiveCatalog createHiveCatalog() { + return createHiveCatalog(CatalogTestBase.TEST_CATALOG_NAME); + } + + public static HiveCatalog createHiveCatalog(String catalogName) { + return new HiveCatalog(catalogName, null, createHiveConf()); } public static HiveCatalog createHiveCatalog(HiveConf hiveConf) { return new HiveCatalog(CatalogTestBase.TEST_CATALOG_NAME, null, hiveConf); } - public static HiveConf getHiveConf() throws IOException { + public static HiveConf createHiveConf() { ClassLoader classLoader = new HiveTestUtils().getClass().getClassLoader(); HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); - TEMPORARY_FOLDER.create(); - String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; - String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); - HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); - hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); + try { + TEMPORARY_FOLDER.create(); + String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; + String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); - return hiveConf; + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); + hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); + return hiveConf; + } catch (IOException e) { + throw new CatalogException( + "Failed to create test HiveConf to HiveCatalog.", e); + } } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java new file mode 100644 index 0000000000000..3465534215485 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.descriptors; + +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.DescriptorTestBase; +import org.apache.flink.table.descriptors.DescriptorValidator; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for {@link HiveCatalogDescriptor}. + */ +public class HiveCatalogDescriptorTest extends DescriptorTestBase { + + @Override + protected List descriptors() { + final Descriptor descriptor = new HiveCatalogDescriptor(); + + return Arrays.asList(descriptor); + } + + @Override + protected List> properties() { + final Map props1 = new HashMap<>(); + props1.put("type", "hive"); + props1.put("property-version", "1"); + + return Arrays.asList(props1); + } + + @Override + protected DescriptorValidator validator() { + return new HiveCatalogValidator(); + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java new file mode 100644 index 0000000000000..e8b1dc485f18f --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.factories; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogDescriptor; +import org.apache.flink.table.descriptors.CatalogDescriptor; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Test for {@link HiveCatalog} created by {@link HiveCatalogFactory}. + */ +public class HiveCatalogFactoryTest extends TestLogger { + + @Test + public void test() { + final String catalogName = "mycatalog"; + + final HiveCatalog expectedCatalog = HiveTestUtils.createHiveCatalog(catalogName); + + final CatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor(); + + final Map properties = catalogDescriptor.toProperties(); + + final Catalog actualCatalog = TableFactoryService.find(CatalogFactory.class, properties) + .createCatalog(catalogName, properties); + + checkEquals(expectedCatalog, (HiveCatalog) actualCatalog); + } + + private static void checkEquals(HiveCatalog c1, HiveCatalog c2) { + // Only assert a few selected properties for now + assertEquals(c1.getName(), c2.getName()); + assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase()); + } +} diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml index 80ee42bcb5350..528e57616246c 100644 --- a/flink-table/flink-sql-client/pom.xml +++ b/flink-table/flink-sql-client/pom.xml @@ -154,6 +154,339 @@ under the License. test + + org.apache.flink + flink-connector-hive_${scala.binary.version} + ${project.version} + provided + true + + + + + + org.apache.flink + flink-connector-hive_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + test + + + + org.apache.hive + hive-metastore + ${hive.version} + test + + + org.apache.hive + hive-shims + + + javolution + javolution + + + com.google.guava + guava + + + com.google.protobuf + protobuf-java + + + org.apache.hbase + hbase-client + + + commons-lang + commons-lang + + + com.zaxxer + HikariCP + + + co.cask.tephra + tephra-api + + + co.cask.tephra + tephra-core + + + co.cask.tephra + tephra-hbase-compat-1.0 + + + commons-cli + commons-cli + + + org.apache.thrift + libfb303 + + + javax.transaction + transaction-api + + + org.apache.orc + orc-core + + + joda-time + joda-time + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.ant + ant + + + com.tdunning + json + + + jline + jline + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + + org.apache.logging.log4j + log4j-web + + + io.dropwizard.metrics + metrics-core + + + io.dropwizard.metrics + metrics-jvm + + + io.dropwizard.metrics + metrics-json + + + com.github.joshelser + dropwizard-metrics-hadoop-metrics2-reporter + + + + + tomcat + jasper-compiler + + + tomcat + jasper-runtime + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + + + commons-codec + commons-codec + + + org.apache.avro + avro + + + net.sf.opencsv + opencsv + + + org.apache.parquet + parquet-hadoop-bundle + + + + + + org.apache.hive + hive-exec + ${hive.version} + test + + + org.apache.hive + hive-vector-code-gen + + + org.apache.hive + hive-llap-tez + + + org.apache.hive + hive-shims + + + commons-codec + commons-codec + + + commons-httpclient + commons-httpclient + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.antlr + antlr-runtime + + + org.antlr + ST4 + + + org.apache.ant + ant + + + org.apache.commons + commons-compress + + + org.apache.ivy + ivy + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + apache-curator + + + org.apache.curator + curator-framework + + + org.codehaus.groovy + groovy-all + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + com.google.code.gson + gson + + + stax + stax-api + + + + + + org.apache.hadoop + hadoop-common + ${hivemetastore.hadoop.version} + test + + + com.google.guava + guava + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.avro + avro + + + commons-digester + commons-digester + + + com.sun.jersey + jersey-json + + + org.codehaus.jackson + jackson-mapper-asl + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hivemetastore.hadoop.version} + provided + + + org.apache.curator + curator-test + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.avro + avro + + + org.codehaus.jackson + jackson-core-asl + + + + diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index c3d0ba0735ec7..d12f0eac83d1c 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -24,6 +24,9 @@ import org.apache.flink.table.api.Types; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; @@ -46,6 +49,7 @@ import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Dependency tests for {@link LocalExecutor}. Mainly for testing classloading of dependencies. @@ -112,7 +116,7 @@ public TestTableSinkFactory() { } /** - * External catalog that can be discovered if classloading is correct. + * Catalog that can be discovered if classloading is correct. */ public static class TestCatalogFactory implements CatalogFactory { @@ -126,7 +130,6 @@ public Map requiredContext() { @Override public List supportedProperties() { final List properties = new ArrayList<>(); - properties.add(TEST_PROPERTY); properties.add(CATALOG_DEFAULT_DATABASE); return properties; } @@ -150,4 +153,32 @@ public TestCatalog(String name, String defaultDatabase) { super(name, defaultDatabase); } } + + /** + * A test factory that is the same as {@link HiveCatalogFactory} + * except returning a {@link HiveCatalog} always with an embedded Hive metastore + * to test logic of {@link HiveCatalogFactory}. + */ + public static class TestHiveCatalogFactory extends HiveCatalogFactory { + @Override + public Map requiredContext() { + Map context = super.requiredContext(); + + // For factory discovery service to distinguish TestHiveCatalogFactory from HiveCatalogFactory + context.put("test", "test"); + return context; + } + + @Override + public Catalog createCatalog(String name, Map properties) { + // Test HiveCatalogFactory.createCatalog + // But not use it for testing purpose + assertTrue(super.createCatalog(name, properties) != null); + + // Developers may already have their own production/testing hive-site.xml set in their environment, + // and Flink tests should avoid using those hive-site.xml. + // Thus, explicitly create a testing HiveConf for unit tests here + return HiveTestUtils.createHiveCatalog(name); + } + } } diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 7d81c83850079..b4e3095367f77 100644 --- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -16,3 +16,4 @@ org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory +org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml index a1b2dcbc6341d..6b86ce7300eb7 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml @@ -127,3 +127,6 @@ catalogs: - name: inmemorycatalog type: generic_in_memory default-database: mydatabase + - name: mycatalog + type: hive + test: test diff --git a/pom.xml b/pom.xml index eba6d161a92f1..fdb7e7576a18f 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,8 @@ under the License. --> 2.7.2 ./docs/_includes/generated + 2.3.4 + 2.7.2