diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 91133090b18d4..fe180ded56203 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; public class HighAvailabilityServicesUtils { @@ -32,8 +34,8 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configu return new EmbeddedNonHaServices(); case ZOOKEEPER: - throw new UnsupportedOperationException("ZooKeeper high availability services " + - "have not been implemented yet."); + return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), + Executors.directExecutor(), config); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); @@ -49,8 +51,8 @@ public static HighAvailabilityServices createHighAvailabilityServices(Configurat final String resourceManagerAddress = null; return new NonHaServices(resourceManagerAddress); case ZOOKEEPER: - throw new UnsupportedOperationException("ZooKeeper high availability services " + - "have not been implemented yet."); + return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), + Executors.directExecutor(), configuration); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index ed0ad171a6ddc..741f9e6ba2f35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -97,10 +97,14 @@ public class ZookeeperHaServices implements HighAvailabilityServices { /** The runtime configuration */ private final Configuration configuration; + /** The zookeeper based running jobs registry */ + private final RunningJobsRegistry runningJobsRegistry; + public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) { this.client = checkNotNull(client); this.executor = checkNotNull(executor); this.configuration = checkNotNull(configuration); + this.runningJobsRegistry = new ZookeeperRegistry(client, configuration); } // ------------------------------------------------------------------------ @@ -149,7 +153,7 @@ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { @Override public RunningJobsRegistry getRunningJobsRegistry() { - throw new UnsupportedOperationException("not yet implemented"); + return runningJobsRegistry; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java new file mode 100644 index 0000000000000..c0621afd8b1a1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A zookeeper based registry for running jobs, highly available. + */ +public class ZookeeperRegistry implements RunningJobsRegistry { + + private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/"; + + /** The ZooKeeper client to use */ + private final CuratorFramework client; + + private final String runningJobPath; + + private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry"; + + public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) { + this.client = client; + runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + + configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH); + } + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + String zkPath = runningJobPath + jobID.toString(); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.setData().forPath(zkPath); + } + catch (Exception e) { + throw new IOException("Set running state to zk fail for job " + jobID.toString(), e); + } + } + + @Override + public void setJobFinished(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + String zkPath = runningJobPath + jobID.toString(); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.delete().forPath(zkPath); + } + catch (Exception e) { + throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e); + } + } + + @Override + public boolean isJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); + if (stat != null) { + return true; + } + return false; + } + catch (Exception e) { + throw new IOException("Get running state from zk fail for job " + jobID.toString(), e); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java new file mode 100644 index 0000000000000..72982c83bcaf3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +public class ZooKeeperRegistryTest extends TestLogger { + private TestingServer testingServer; + + private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class); + + @Before + public void before() throws Exception { + testingServer = new TestingServer(); + } + + @After + public void after() throws Exception { + testingServer.stop(); + testingServer = null; + } + + /** + * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning() + */ + @Test + public void testZooKeeperRegistry() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); + RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); + + try { + JobID jobID = JobID.generate(); + assertTrue(!zkRegistry.isJobRunning(jobID)); + + zkRegistry.setJobRunning(jobID); + assertTrue(zkRegistry.isJobRunning(jobID)); + + zkRegistry.setJobFinished(jobID); + assertTrue(!zkRegistry.isJobRunning(jobID)); + + } finally { + if (zkHaService != null) { + zkHaService.close(); + } + } + } +}