Skip to content

Commit

Permalink
[FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry
Browse files Browse the repository at this point in the history
This closes apache#2903
  • Loading branch information
tiemsn authored and StephanEwen committed Feb 20, 2017
1 parent 8780cb6 commit 544f534
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.runtime.highavailability;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;

public class HighAvailabilityServicesUtils {

Expand All @@ -32,8 +34,8 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configu
return new EmbeddedNonHaServices();

case ZOOKEEPER:
throw new UnsupportedOperationException("ZooKeeper high availability services " +
"have not been implemented yet.");
return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config),
Executors.directExecutor(), config);

default:
throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
Expand All @@ -49,8 +51,8 @@ public static HighAvailabilityServices createHighAvailabilityServices(Configurat
final String resourceManagerAddress = null;
return new NonHaServices(resourceManagerAddress);
case ZOOKEEPER:
throw new UnsupportedOperationException("ZooKeeper high availability services " +
"have not been implemented yet.");
return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration),
Executors.directExecutor(), configuration);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
/** The runtime configuration */
private final Configuration configuration;

/** The zookeeper based running jobs registry */
private final RunningJobsRegistry runningJobsRegistry;

public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
this.client = checkNotNull(client);
this.executor = checkNotNull(executor);
this.configuration = checkNotNull(configuration);
this.runningJobsRegistry = new ZookeeperRegistry(client, configuration);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -149,7 +153,7 @@ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {

@Override
public RunningJobsRegistry getRunningJobsRegistry() {
throw new UnsupportedOperationException("not yet implemented");
return runningJobsRegistry;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.highavailability;

import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A zookeeper based registry for running jobs, highly available.
*/
public class ZookeeperRegistry implements RunningJobsRegistry {

private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/";

/** The ZooKeeper client to use */
private final CuratorFramework client;

private final String runningJobPath;

private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry";

public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
this.client = client;
runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) +
configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH);
}

@Override
public void setJobRunning(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
String zkPath = runningJobPath + jobID.toString();
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.setData().forPath(zkPath);
}
catch (Exception e) {
throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
}
}

@Override
public void setJobFinished(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
String zkPath = runningJobPath + jobID.toString();
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.delete().forPath(zkPath);
}
catch (Exception e) {
throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
}
}

@Override
public boolean isJobRunning(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
if (stat != null) {
return true;
}
return false;
}
catch (Exception e) {
throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.highavailability;

import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.*;

public class ZooKeeperRegistryTest extends TestLogger {
private TestingServer testingServer;

private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class);

@Before
public void before() throws Exception {
testingServer = new TestingServer();
}

@After
public void after() throws Exception {
testingServer.stop();
testingServer = null;
}

/**
* Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
*/
@Test
public void testZooKeeperRegistry() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();

try {
JobID jobID = JobID.generate();
assertTrue(!zkRegistry.isJobRunning(jobID));

zkRegistry.setJobRunning(jobID);
assertTrue(zkRegistry.isJobRunning(jobID));

zkRegistry.setJobFinished(jobID);
assertTrue(!zkRegistry.isJobRunning(jobID));

} finally {
if (zkHaService != null) {
zkHaService.close();
}
}
}
}

0 comments on commit 544f534

Please sign in to comment.