Skip to content

Commit

Permalink
[FLINK-22636][zk] Group job specific zNodes under /jobs zNode
Browse files Browse the repository at this point in the history
In order to better clean up job specific HA services, this commit changes the layout of the
zNode structure so that the JobMaster leader, checkpoints and checkpoint counter is now grouped
below the jobs/ zNode.

Moreover, this commit groups the leaders of the cluster components (Dispatcher, ResourceManager,
RestServer) under /leader/process/latch and /leader/process/connection-info.

This closes apache#15893.
  • Loading branch information
tillrohrmann committed May 18, 2021
1 parent 927a21c commit 79936be
Show file tree
Hide file tree
Showing 26 changed files with 382 additions and 387 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
<td style="word-wrap: break-word;">"/checkpoints"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for job graphs</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.latch</h5></td>
<td style="word-wrap: break-word;">"/leaderlatch"</td>
<td>String</td>
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.leader</h5></td>
<td style="word-wrap: break-word;">"/leader"</td>
<td>String</td>
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
<td style="word-wrap: break-word;">"/mesos-workers"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
<td style="word-wrap: break-word;">"/checkpoints"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for job graphs</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.latch</h5></td>
<td style="word-wrap: break-word;">"/leaderlatch"</td>
<td>String</td>
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.leader</h5></td>
<td style="word-wrap: break-word;">"/leader"</td>
<td>String</td>
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
<td style="word-wrap: break-word;">"/mesos-workers"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ public final class ConfigConstants {
public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
"high-availability.zookeeper.path.namespace";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
/** @deprecated no longer used. */
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";

Expand All @@ -1026,14 +1026,14 @@ public final class ConfigConstants {
public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
"high-availability.zookeeper.path.jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
/** @deprecated no longer used. */
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";

/**
* ZooKeeper root path (ZNode) for completed checkpoints.
*
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
* @deprecated no longer used.
*/
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
Expand All @@ -1042,7 +1042,7 @@ public final class ConfigConstants {
/**
* ZooKeeper root path (ZNode) for checkpoint counters.
*
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
* @deprecated no longer used.
*/
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
Expand Down Expand Up @@ -1691,21 +1691,19 @@ public final class ConfigConstants {
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";

/**
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}
*/
/** @deprecated no longer used. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ public class HighAvailabilityOptions {
.withDescription(
"The root path under which Flink stores its entries in ZooKeeper.");

@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch")
.withDescription(
"Defines the znode of the leader latch which is used to elect the leader.");

/** ZooKeeper root path (ZNode) for job graphs. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
Expand All @@ -131,31 +123,6 @@ public class HighAvailabilityOptions {
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
.withDescription("ZooKeeper root path (ZNode) for job graphs");

@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader")
.withDescription(
"Defines the znode of the leader which contains the URL to the leader and the current"
+ " leader session ID.");

/** ZooKeeper root path (ZNode) for completed checkpoints. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");

/** ZooKeeper root path (ZNode) for checkpoint counters. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");

/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
kubeClient,
configuration,
ioExecutor,
this::getLeaderNameForJobManager,
this::getLeaderPathForJobManager,
lockIdentity);
}

@Override
public JobGraphStore createJobGraphStore() throws Exception {
return KubernetesUtils.createJobGraphStore(
configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity);
configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}

@Override
public RunningJobsRegistry createRunningJobsRegistry() {
return new KubernetesRunningJobsRegistry(
kubeClient, getLeaderNameForDispatcher(), lockIdentity);
kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}

@Override
Expand All @@ -144,25 +144,25 @@ public void internalCleanup() throws Exception {

@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
kubeClient.deleteConfigMap(getLeaderPathForJobManager(jobID)).get();
}

@Override
protected String getLeaderNameForResourceManager() {
protected String getLeaderPathForResourceManager() {
return getLeaderName(RESOURCE_MANAGER_NAME);
}

@Override
protected String getLeaderNameForDispatcher() {
protected String getLeaderPathForDispatcher() {
return getLeaderName(DISPATCHER_NAME);
}

public String getLeaderNameForJobManager(final JobID jobID) {
public String getLeaderPathForJobManager(final JobID jobID) {
return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME);
}

@Override
protected String getLeaderNameForRestServer() {
protected String getLeaderPathForRestServer() {
return getLeaderName(REST_SERVER_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception {
new VoidBlobStore());
JobID jobID = new JobID();
String configMapName =
kubernetesHaServices.getLeaderNameForJobManager(jobID);
kubernetesHaServices.getLeaderPathForJobManager(jobID);
final KubernetesConfigMap configMap =
new TestingFlinkKubeClient.MockKubernetesConfigMap(
configMapName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.ZooKeeperUtils;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount;
Expand Down Expand Up @@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client Curator ZooKeeper client
* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
*/
public ZooKeeperCheckpointIDCounter(
CuratorFramework client,
String counterPath,
LastStateConnectionStateListener connectionStateListener) {
CuratorFramework client, LastStateConnectionStateListener connectionStateListener) {
this.client = checkNotNull(client, "Curator client");
this.counterPath = checkNotNull(counterPath, "Counter path");
this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath();
this.sharedCount = new SharedCount(client, counterPath, 1);
this.connectionStateListener = connectionStateListener;
}
Expand Down Expand Up @@ -176,4 +175,9 @@ private void checkConnectionState() {
}
});
}

@VisibleForTesting
String getPath() {
return counterPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore(
throws Exception {

return ZooKeeperUtils.createCompletedCheckpoints(
client, config, jobId, maxNumberOfCheckpointsToRetain, executor);
ZooKeeperUtils.useNamespaceAndEnsurePath(
client, ZooKeeperUtils.getPathForJob(jobId)),
config,
maxNumberOfCheckpointsToRetain,
executor);
}

@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
return ZooKeeperUtils.createCheckpointIDCounter(
ZooKeeperUtils.useNamespaceAndEnsurePath(
client, ZooKeeperUtils.getPathForJob(jobID)));
}
}
Loading

0 comments on commit 79936be

Please sign in to comment.