Skip to content

Commit

Permalink
[FLINK-17407] Add utility class for external resource framework
Browse files Browse the repository at this point in the history
  • Loading branch information
KarmaGYZ authored and tillrohrmann committed May 17, 2020
1 parent e725526 commit d4d3747
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.externalresource;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.configuration.ConfigOptions.key;

/**
* Utility class for external resource framework.
*/
public class ExternalResourceUtils {

private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);

private ExternalResourceUtils() {
throw new UnsupportedOperationException("This class should never be instantiated.");
}

/**
* Get the enabled external resource list from configuration.
*/
private static Set<String> getExternalResourceSet(Configuration config) {
return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST));
}

/**
* Get the external resources map.
*/
public static Map<String, Long> getExternalResources(Configuration config, String suffix) {
final Set<String> resourceSet = getExternalResourceSet(config);
LOG.info("Enabled external resources: {}", resourceSet);

if (resourceSet.isEmpty()) {
return Collections.emptyMap();
}

final Map<String, Long> externalResourceConfigs = new HashMap<>();
for (String resourceName: resourceSet) {
final ConfigOption<Long> amountOption =
key(ExternalResourceOptions.getAmountConfigOptionForResource(resourceName))
.longType()
.noDefaultValue();
final ConfigOption<String> configKeyOption =
key(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(resourceName, suffix))
.stringType()
.noDefaultValue();
final String configKey = config.getString(configKeyOption);
final Optional<Long> amountOpt = config.getOptional(amountOption);

if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName);
continue;
}
if (!amountOpt.isPresent()) {
LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName);
continue;
} else if (amountOpt.get() <= 0) {
LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName);
continue;
}

if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) {
LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt);
} else {
LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt);
}
}

return externalResourceConfigs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.externalresource;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Tests for the {@link ExternalResourceUtils} class.
*/
public class ExternalResourceUtilsTest extends TestLogger {

private static final String RESOURCE_NAME_1 = "foo";
private static final String RESOURCE_NAME_2 = "bar";
private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2);
private static final long RESOURCE_AMOUNT_1 = 2L;
private static final long RESOURCE_AMOUNT_2 = 1L;
private static final String RESOURCE_CONFIG_KEY_1 = "flink1";
private static final String RESOURCE_CONFIG_KEY_2 = "flink2";
private static final String SUFFIX = "flink.config-key";

@Test
public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() {
final Configuration config = new Configuration();
final String resourceConfigKey = "";

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), RESOURCE_AMOUNT_1);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2), RESOURCE_AMOUNT_2);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX), resourceConfigKey);

final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithIllegalAmount() {
final Configuration config = new Configuration();
final long resourceAmount = 0L;

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), resourceAmount);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithoutConfigAmount() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithConflictConfigKey() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), RESOURCE_AMOUNT_1);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2), RESOURCE_AMOUNT_2);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX);

// Only one of the config key would be kept.
assertThat(configMap.size(), is(1));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1));
}

@Test
public void testGetExternalResourcesWithMultipleExternalResource() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), RESOURCE_AMOUNT_1);
config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2), RESOURCE_AMOUNT_2);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1);
config.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2);

final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.size(), is(2));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2));
assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1));
assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2));
}
}

0 comments on commit d4d3747

Please sign in to comment.