Skip to content

Commit

Permalink
[FLINK-14580][hive] add HiveModuleFactory, HiveModuleDescriptor, and …
Browse files Browse the repository at this point in the history
…HiveModuleDescriptorValidator

add HiveModuleFactory, HiveModuleDescriptor, and HiveModuleDescriptorValidator for HiveModule.

This closes apache#10092.
  • Loading branch information
bowenli86 committed Nov 6, 2019
1 parent b9e75dd commit 3a39d9c
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.module.hive;

import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.ModuleDescriptor;
import org.apache.flink.util.StringUtils;

import java.util.Map;

import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_HIVE_VERSION;
import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_TYPE_HIVE;
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Module descriptor for {@link HiveModule}.
*/
public class HiveModuleDescriptor extends ModuleDescriptor {
private String hiveVersion;

public HiveModuleDescriptor() {
super(MODULE_TYPE_HIVE);
}

public HiveModuleDescriptor hiveVersion(String hiveVersion) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion));
this.hiveVersion = hiveVersion;
return this;
}

@Override
protected Map<String, String> toModuleProperties() {
final DescriptorProperties properties = new DescriptorProperties();

if (hiveVersion != null) {
properties.putString(MODULE_HIVE_VERSION, hiveVersion);
}

return properties.asMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.module.hive;

import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.ModuleDescriptorValidator;

/**
* Validator for {@link HiveModuleDescriptor}.
*/
public class HiveModuleDescriptorValidator extends ModuleDescriptorValidator {
public static final String MODULE_TYPE_HIVE = "hive";
public static final String MODULE_HIVE_VERSION = "hive-version";

@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(MODULE_TYPE, MODULE_TYPE_HIVE, false);
properties.validateString(MODULE_HIVE_VERSION, true, 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.module.hive;

import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_HIVE_VERSION;
import static org.apache.flink.table.module.hive.HiveModuleDescriptorValidator.MODULE_TYPE_HIVE;

/**
* Factory for {@link HiveModule}.
*/
public class HiveModuleFactory implements ModuleFactory {

@Override
public Module createModule(Map<String, String> properties) {
final DescriptorProperties descProperties = getValidatedProperties(properties);

final String hiveVersion = descProperties.getOptionalString(MODULE_HIVE_VERSION)
.orElse(HiveShimLoader.getHiveVersion());

return new HiveModule(hiveVersion);
}

private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);

new HiveModuleDescriptorValidator().validate(descriptorProperties);

return descriptorProperties;
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(MODULE_TYPE, MODULE_TYPE_HIVE);

return context;
}

@Override
public List<String> supportedProperties() {
return new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.module.hive.HiveModuleFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.module.hive;

import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorTestBase;
import org.apache.flink.table.descriptors.DescriptorValidator;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Tests for {@link HiveModuleDescriptor}.
*/
public class HiveModuleDescriptorTest extends DescriptorTestBase {

@Override
protected List<Descriptor> descriptors() {
final Descriptor descriptor = new HiveModuleDescriptor();

return Arrays.asList(descriptor);
}

@Override
protected List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
props1.put("type", "hive");

return Arrays.asList(props1);
}

@Override
protected DescriptorValidator validator() {
return new HiveModuleDescriptorValidator();
}
}

0 comments on commit 3a39d9c

Please sign in to comment.