Skip to content

Commit

Permalink
[FLINK-23720][table] Migrate ModuleFactory to the new stack
Browse files Browse the repository at this point in the history
- [table] Migrate HiveModule
- [table] Migrate CoreModule
- [table] Add a module factory helper
- [table] Remove ModuleDescriptor(Validator)
- [sql-client] Remove unsued test module
- [table] Move deprecated logic into FactoryUtil for easier removal
- [table] Make LOAD MODULE summary string more SQL-like

This closes apache#16781.
  • Loading branch information
Airblader authored and twalthr committed Aug 16, 2021
1 parent 6d89c1d commit 6d0263f
Show file tree
Hide file tree
Showing 28 changed files with 458 additions and 393 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,51 @@

package org.apache.flink.table.module.hive;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Set;

import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_HIVE_VERSION;
import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_TYPE_HIVE;
import static org.apache.flink.table.module.hive.HiveModuleOptions.HIVE_VERSION;

/** Factory for {@link HiveModule}. */
@Internal
public class HiveModuleFactory implements ModuleFactory {

@Override
public Module createModule(Map<String, String> properties) {
final DescriptorProperties descProperties = getValidatedProperties(properties);

final String hiveVersion =
descProperties
.getOptionalString(MODULE_HIVE_VERSION)
.orElse(HiveShimLoader.getHiveVersion());
public static final String IDENTIFIER = "hive";

return new HiveModule(hiveVersion);
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);

new HiveModuleDescriptorValidator().validate(descriptorProperties);

return descriptorProperties;
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(MODULE_TYPE, MODULE_TYPE_HIVE);

return context;
public Set<ConfigOption<?>> optionalOptions() {
return Collections.singleton(HIVE_VERSION);
}

@Override
public List<String> supportedProperties() {
return Arrays.asList(MODULE_HIVE_VERSION);
public Module createModule(Context context) {
final FactoryUtil.ModuleFactoryHelper factoryHelper =
FactoryUtil.createModuleFactoryHelper(this, context);
factoryHelper.validate();

final String hiveVersion =
factoryHelper
.getOptions()
.getOptional(HIVE_VERSION)
.orElseGet(HiveShimLoader::getHiveVersion);

return new HiveModule(hiveVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@

package org.apache.flink.table.module.hive;

import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.ModuleDescriptorValidator;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** Validator for {@link HiveModuleDescriptor}. */
public class HiveModuleDescriptorValidator extends ModuleDescriptorValidator {
public static final String MODULE_TYPE_HIVE = "hive";
public static final String MODULE_HIVE_VERSION = "hive-version";
/** Configuration options for the Hive module. */
@PublicEvolving
public class HiveModuleOptions {

@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(MODULE_TYPE, MODULE_TYPE_HIVE, false);
properties.validateString(MODULE_HIVE_VERSION, true, 1);
}
public static final ConfigOption<String> HIVE_VERSION =
ConfigOptions.key("hive-version").stringType().noDefaultValue();

private HiveModuleOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.module.hive.HiveModuleFactory
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.table.module.hive.HiveModuleFactory
org.apache.flink.table.planner.delegation.hive.HiveParserFactory

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.flink.table.module.hive;

import org.apache.flink.table.descriptors.ModuleDescriptor;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.Module;

import org.junit.Test;

import java.util.Map;
import java.util.Collections;

import static org.junit.Assert.assertEquals;

Expand All @@ -34,12 +33,12 @@ public class HiveModuleFactoryTest {
public void test() {
final HiveModule expected = new HiveModule();

final ModuleDescriptor moduleDescriptor = new HiveModuleDescriptor();

final Map<String, String> properties = moduleDescriptor.toProperties();

final Module actualModule =
TableFactoryService.find(ModuleFactory.class, properties).createModule(properties);
FactoryUtil.createModule(
HiveModuleFactory.IDENTIFIER,
Collections.emptyMap(),
new Configuration(),
Thread.currentThread().getContextClassLoader());

checkEquals(expected, (HiveModule) actualModule);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.types.DataType;
Expand All @@ -61,7 +59,6 @@

import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -72,7 +69,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -84,7 +80,6 @@ public class DependencyTest {
private static final String TEST_PROPERTY_VALUE = "test-value";

public static final String CATALOG_TYPE_TEST = "DependencyTest";
public static final String MODULE_TYPE_TEST = "ModuleDependencyTest";

private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar";
private static final List<String> INIT_SQL =
Expand Down Expand Up @@ -203,32 +198,6 @@ public DynamicTableSource createDynamicTableSource(Context context) {
}
}

/** Module that can be discovered if classloading is correct. */
public static class TestModuleFactory implements ModuleFactory {

@Override
public Module createModule(Map<String, String> properties) {
return new TestModule();
}

@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(MODULE_TYPE, MODULE_TYPE_TEST);
return context;
}

@Override
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add("test");
return properties;
}
}

/** Test module. */
public static class TestModule implements Module {}

/** Catalog that can be discovered if classloading is correct. */
public static class TestCatalogFactory implements CatalogFactory {

Expand Down

This file was deleted.

16 changes: 5 additions & 11 deletions flink-table/flink-sql-client/src/test/resources/sql/module.q
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,18 @@ Was expecting one of:
# load hive module with module name capitalized
LOAD MODULE Hive;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.ModuleFactory' in
the classpath.
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'Hive' that implements 'org.apache.flink.table.factories.ModuleFactory' in the classpath.

Reason: Required context properties mismatch.
Available factory identifiers are:

The following properties are requested:
type=Hive

The following factories have been considered:
org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory
org.apache.flink.table.module.CoreModuleFactory
org.apache.flink.table.module.hive.HiveModuleFactory
core
hive
!error

# load hive module with specifying type
LOAD MODULE myhive WITH ('type' = 'hive');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Property 'type' = 'hive' is not supported since module name is used to find module
org.apache.flink.table.api.ValidationException: Option 'type' = 'hive' is not supported since module name is used to find module
!error

LOAD MODULE hive;
Expand Down
Loading

0 comments on commit 6d0263f

Please sign in to comment.