Skip to content

Commit

Permalink
[FLINK-33376][coordination] Extend ZooKeeper Curator configurations
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Pohl <[email protected]>
  • Loading branch information
JTaky and XComp committed Mar 28, 2024
1 parent f31c128 commit 83f82ab
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>String</td>
<td>Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.authorization</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Map</td>
<td>Add connection authorization Subsequent calls to this method overwrite the prior calls. In certain cases ZooKeeper requires additional Authorization information. For example list of valid names for ensemble in order to prevent accidentally connecting to a wrong ensemble. Each entry of type Map.Entry&lt;String, String&gt; will be transformed into an AuthInfo object with the constructor AuthInfo(String, byte[]). The field entry.key() will serve as the String scheme value, while the field entry.getValue() will be initially converted to a byte[] using the String#getBytes() method with UTF-8 encoding. If not set the default configuration for a Curator would be applied.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.connection-timeout</h5></td>
<td style="word-wrap: break-word;">15000</td>
Expand All @@ -50,6 +56,12 @@
<td>Boolean</td>
<td>Defines whether Curator should enable ensemble tracker. This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Default Curator EnsembleTracking logic watches CuratorEventType.GET_CONFIG events and changes ZooKeeper connection string. It is not desired behaviour when ZooKeeper is running under the Virtual IPs. Under certain configurations EnsembleTracking can lead to setting of ZooKeeper connection string with unresolvable hostnames.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.max-close-wait</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Defines the time Curator should wait during close to join background threads. If not set the default configuration for a Curator would be applied.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.max-retry-attempts</h5></td>
<td style="word-wrap: break-word;">3</td>
Expand All @@ -68,6 +80,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.simulated-session-expiration-percent</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The percentage set by this method determines how and if Curator will check for session expiration. See Curator documentation for <a href="https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)">simulatedSessionExpirationPercent</a> property for more information.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.tolerate-suspended-connections</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

import java.time.Duration;
import java.util.Map;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.text;

/** The set of configuration options relating to high-availability settings. */
public class HighAvailabilityOptions {
Expand Down Expand Up @@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
+ "with unresolvable hostnames.")
.build());

public static final ConfigOption<Map<String, String>> ZOOKEEPER_CLIENT_AUTHORIZATION =
key("high-availability.zookeeper.client.authorization")
.mapType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Add connection authorization Subsequent calls to this method overwrite the prior calls. "
+ "In certain cases ZooKeeper requires additional Authorization information. "
+ "For example list of valid names for ensemble in order to prevent accidentally connecting to a wrong ensemble. "
+ "Each entry of type Map.Entry<String, String> will be transformed "
+ "into an AuthInfo object with the constructor AuthInfo(String, byte[]). "
+ "The field entry.key() will serve as the String scheme value, while the field entry.getValue() "
+ "will be initially converted to a byte[] using the String#getBytes() method with %s encoding. "
+ "If not set the default configuration for a Curator would be applied.",
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
.build());

public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT =
key("high-availability.zookeeper.client.max-close-wait")
.durationType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Defines the time Curator should wait during close to join background threads. "
+ "If not set the default configuration for a Curator would be applied.")
.build());

public static final ConfigOption<Integer> ZOOKEEPER_SIMULATED_SESSION_EXP_PERCENT =
key("high-availability.zookeeper.client.simulated-session-expiration-percent")
.intType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The percentage set by this method determines how and if Curator will check for session expiration. "
+ "See Curator documentation for %s property for more information.",
link(
"https://curator.apache.org/apidocs/org/apache/curator/framework/"
+ "CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)",
"simulatedSessionExpirationPercent"))
.build());

// ------------------------------------------------------------------------
// Deprecated options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.RunnableWithException;

import org.apache.flink.shaded.curator5.org.apache.curator.framework.AuthInfo;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.ACLProvider;
Expand Down Expand Up @@ -83,6 +85,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -246,6 +249,43 @@ public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework(
.ensembleTracker(ensembleTracking)
.aclProvider(aclProvider);

if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION)) {
Map<String, String> authMap =
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
List<AuthInfo> authInfos =
authMap.entrySet().stream()
.map(
entry ->
new AuthInfo(
entry.getKey(),
entry.getValue()
.getBytes(
ConfigConstants
.DEFAULT_CHARSET)))
.collect(Collectors.toList());
curatorFrameworkBuilder.authorization(authInfos);
}

if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT)) {
long maxCloseWait =
configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT).toMillis();
if (maxCloseWait < 0 || maxCloseWait > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(
"The value (%d ms) is out-of-range for %s. The milliseconds timeout is expected to be between 0 and %d ms.",
maxCloseWait,
HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key(),
Integer.MAX_VALUE);
}
curatorFrameworkBuilder.maxCloseWaitMs((int) maxCloseWait);
}

if (configuration.contains(
HighAvailabilityOptions.ZOOKEEPER_SIMULATED_SESSION_EXP_PERCENT)) {
curatorFrameworkBuilder.simulatedSessionExpirationPercent(
configuration.get(
HighAvailabilityOptions.ZOOKEEPER_SIMULATED_SESSION_EXP_PERCENT));
}

if (configuration.get(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS)) {
curatorFrameworkBuilder.connectionStateErrorPolicy(
new SessionConnectionStateErrorPolicy());
Expand Down

0 comments on commit 83f82ab

Please sign in to comment.