Skip to content

Commit

Permalink
[FLINK-10436] Add ConfigOption#withFallbackKeys (apache#6872)
Browse files Browse the repository at this point in the history
* [FLINK-10436] Add ConfigOption#withFallbackKeys

* [hotfix] correct javadoc

* [FLINK-10436] Code quality improvement
  • Loading branch information
tisonkun authored and tillrohrmann committed Jan 9, 2019
1 parent 0315f20 commit 1e2aa8e
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
@PublicEvolving
public class ConfigOption<T> {

private static final String[] EMPTY = new String[0];
private static final FallbackKey[] EMPTY = new FallbackKey[0];

// ------------------------------------------------------------------------

/** The current key for that config option. */
private final String key;

/** The list of deprecated keys, in the order to be checked. */
private final String[] deprecatedKeys;
private final FallbackKey[] fallbackKeys;

/** The default value for this option. */
private final T defaultValue;
Expand All @@ -58,52 +58,70 @@ public class ConfigOption<T> {
// ------------------------------------------------------------------------

/**
* Creates a new config option with no deprecated keys.
* Creates a new config option with no fallback keys.
*
* @param key The current key for that config option
* @param defaultValue The default value for this option
* @param key The current key for that config option
* @param defaultValue The default value for this option
*/
ConfigOption(String key, T defaultValue) {
this.key = checkNotNull(key);
this.description = Description.builder().text("").build();
this.defaultValue = defaultValue;
this.deprecatedKeys = EMPTY;
this.fallbackKeys = EMPTY;
}

/**
* Creates a new config option with deprecated keys.
* Creates a new config option with fallback keys.
*
* @param key The current key for that config option
* @param description Description for that option
* @param defaultValue The default value for this option
* @param deprecatedKeys The list of deprecated keys, in the order to be checked
* @param key The current key for that config option
* @param description Description for that option
* @param defaultValue The default value for this option
* @param fallbackKeys The list of fallback keys, in the order to be checked
* @deprecated use version with {@link Description} instead
*/
@Deprecated
ConfigOption(String key, String description, T defaultValue, String... deprecatedKeys) {
ConfigOption(String key, String description, T defaultValue, FallbackKey... fallbackKeys) {
this.key = checkNotNull(key);
this.description = Description.builder().text(description).build();
this.defaultValue = defaultValue;
this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
}

/**
* Creates a new config option with deprecated keys.
* Creates a new config option with fallback keys.
*
* @param key The current key for that config option
* @param description Description for that option
* @param defaultValue The default value for this option
* @param deprecatedKeys The list of deprecated keys, in the order to be checked
* @param key The current key for that config option
* @param description Description for that option
* @param defaultValue The default value for this option
* @param fallbackKeys The list of fallback keys, in the order to be checked
*/
ConfigOption(String key, Description description, T defaultValue, String... deprecatedKeys) {
ConfigOption(String key, Description description, T defaultValue, FallbackKey... fallbackKeys) {
this.key = checkNotNull(key);
this.description = description;
this.defaultValue = defaultValue;
this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
}

// ------------------------------------------------------------------------

/**
* Creates a new config option, using this option's key and default value, and
* adding the given fallback keys.
*
* <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
* the fallback keys will be checked in the order provided to this method. The first key for which
* a value is found will be used - that value will be returned.
*
* @param fallbackKeys The fallback keys, in the order in which they should be checked.
* @return A new config options, with the given fallback keys.
*/
public ConfigOption<T> withFallbackKeys(String... fallbackKeys) {
FallbackKey[] fallbackKeyArray = Arrays.stream(fallbackKeys)
.map(FallbackKey::createFallbackKey)
.toArray(FallbackKey[]::new);
return new ConfigOption<>(key, description, defaultValue, fallbackKeyArray);
}

/**
* Creates a new config option, using this option's key and default value, and
* adding the given deprecated keys.
Expand All @@ -116,7 +134,10 @@ public class ConfigOption<T> {
* @return A new config options, with the given deprecated keys.
*/
public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
FallbackKey[] fallbackKeys = Arrays.stream(deprecatedKeys)
.map(FallbackKey::createDeprecatedKey)
.toArray(FallbackKey[]::new);
return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
}

/**
Expand All @@ -138,7 +159,7 @@ public ConfigOption<T> withDescription(final String description) {
* @return A new config option, with given description.
*/
public ConfigOption<T> withDescription(final Description description) {
return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -168,19 +189,19 @@ public T defaultValue() {
}

/**
* Checks whether this option has deprecated keys.
* @return True if the option has deprecated keys, false if not.
* Checks whether this option has fallback keys.
* @return True if the option has fallback keys, false if not.
*/
public boolean hasDeprecatedKeys() {
return deprecatedKeys != EMPTY;
public boolean hasFallbackKeys() {
return fallbackKeys != EMPTY;
}

/**
* Gets the deprecated keys, in the order to be checked.
* @return The option's deprecated keys.
* Gets the fallback keys, in the order to be checked.
* @return The option's fallback keys.
*/
public Iterable<String> deprecatedKeys() {
return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
public Iterable<FallbackKey> fallbackKeys() {
return (fallbackKeys == EMPTY) ? Collections.emptyList() : Arrays.asList(fallbackKeys);
}

/**
Expand All @@ -201,7 +222,7 @@ public boolean equals(Object o) {
else if (o != null && o.getClass() == ConfigOption.class) {
ConfigOption<?> that = (ConfigOption<?>) o;
return this.key.equals(that.key) &&
Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
Arrays.equals(this.fallbackKeys, that.fallbackKeys) &&
(this.defaultValue == null ? that.defaultValue == null :
(that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
}
Expand All @@ -213,13 +234,13 @@ else if (o != null && o.getClass() == ConfigOption.class) {
@Override
public int hashCode() {
return 31 * key.hashCode() +
17 * Arrays.hashCode(deprecatedKeys) +
17 * Arrays.hashCode(fallbackKeys) +
(defaultValue != null ? defaultValue.hashCode() : 0);
}

@Override
public String toString() {
return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
key, defaultValue, Arrays.toString(deprecatedKeys));
return String.format("Key: '%s' , default: %s (fallback keys: %s)",
key, defaultValue, Arrays.toString(fallbackKeys));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,11 @@ public boolean contains(ConfigOption<?> configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
else if (configOption.hasDeprecatedKeys()) {
// try the deprecated keys
for (String deprecatedKey : configOption.deprecatedKeys()) {
if (this.confData.containsKey(deprecatedKey)) {
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
else if (configOption.hasFallbackKeys()) {
// try the fallback keys
for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
if (this.confData.containsKey(fallbackKey.getKey())) {
loggingFallback(fallbackKey, configOption);
return true;
}
}
Expand Down Expand Up @@ -741,11 +740,10 @@ public <T> boolean removeConfig(ConfigOption<T> configOption){
// try the current key
Object oldValue = this.confData.remove(configOption.key());
if (oldValue == null){
for (String deprecatedKey : configOption.deprecatedKeys()){
oldValue = this.confData.remove(deprecatedKey);
for (FallbackKey fallbackKey : configOption.fallbackKeys()){
oldValue = this.confData.remove(fallbackKey.getKey());
if (oldValue != null){
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
loggingFallback(fallbackKey, configOption);
return true;
}
}
Expand Down Expand Up @@ -789,13 +787,12 @@ private Object getRawValueFromOption(ConfigOption<?> configOption) {
// found a value for the current proper key
return o;
}
else if (configOption.hasDeprecatedKeys()) {
else if (configOption.hasFallbackKeys()) {
// try the deprecated keys
for (String deprecatedKey : configOption.deprecatedKeys()) {
Object oo = getRawValue(deprecatedKey);
for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
Object oo = getRawValue(fallbackKey.getKey());
if (oo != null) {
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
loggingFallback(fallbackKey, configOption);
return oo;
}
}
Expand All @@ -809,6 +806,16 @@ private Object getValueOrDefaultFromOption(ConfigOption<?> configOption) {
return o != null ? o : configOption.defaultValue();
}

private void loggingFallback(FallbackKey fallbackKey, ConfigOption<?> configOption) {
if (fallbackKey.isDeprecated()) {
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
fallbackKey.getKey(), configOption.key());
} else {
LOG.info("Config uses fallback configuration key '{}' instead of key '{}'",
fallbackKey.getKey(), configOption.key());
}
}

// --------------------------------------------------------------------------------------------
// Type conversion
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey;

/**
* A configuration that manages a subset of keys with a common prefix from a given configuration.
*/
Expand Down Expand Up @@ -361,17 +363,17 @@ public boolean equals(Object obj) {
private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
String key = prefix + option.key();

List<String> deprecatedKeys;
if (option.hasDeprecatedKeys()) {
List<FallbackKey> deprecatedKeys;
if (option.hasFallbackKeys()) {
deprecatedKeys = new ArrayList<>();
for (String dk : option.deprecatedKeys()) {
deprecatedKeys.add(prefix + dk);
for (FallbackKey dk : option.fallbackKeys()) {
deprecatedKeys.add(createDeprecatedKey(prefix + dk.getKey()));
}
} else {
deprecatedKeys = Collections.emptyList();
}

String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
FallbackKey[] deprecated = deprecatedKeys.toArray(new FallbackKey[0]);
return new ConfigOption<>(key,
option.description(),
option.defaultValue(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

/**
* A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured.
*/
public class FallbackKey {

// -------------------------
// Factory methods
// -------------------------

static FallbackKey createFallbackKey(String key) {
return new FallbackKey(key, false);
}

static FallbackKey createDeprecatedKey(String key) {
return new FallbackKey(key, true);
}

// ------------------------------------------------------------------------

private final String key;

private final boolean isDeprecated;

public String getKey() {
return key;
}

public boolean isDeprecated() {
return isDeprecated;
}

private FallbackKey(String key, boolean isDeprecated) {
this.key = key;
this.isDeprecated = isDeprecated;
}

// ------------------------------------------------------------------------

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && o.getClass() == FallbackKey.class) {
FallbackKey that = (FallbackKey) o;
return this.key.equals(that.key) && (this.isDeprecated == that.isDeprecated);
} else {
return false;
}
}

@Override
public int hashCode() {
return 31 * key.hashCode() + (isDeprecated ? 1 : 0);
}

@Override
public String toString() {
return String.format("{key=%s, isDeprecated=%s}", key, isDeprecated);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class RestOptions {
public static final ConfigOption<String> ADDRESS =
key("rest.address")
.noDefaultValue()
.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
.withFallbackKeys(JobManagerOptions.ADDRESS.key())
.withDescription("The address that should be used by clients to connect to the server.");

/**
Expand Down
Loading

0 comments on commit 1e2aa8e

Please sign in to comment.