Skip to content

Commit

Permalink
[FLINK-5254] [yarn] Implement YARN High-Availability Services
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Dec 23, 2016
1 parent e2922ad commit 2a7dbda
Show file tree
Hide file tree
Showing 33 changed files with 2,537 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
implements IOReadableWritable, java.io.Serializable, Cloneable {

private static final long serialVersionUID = 1L;

private static final byte TYPE_STRING = 0;
private static final byte TYPE_INT = 1;
private static final byte TYPE_LONG = 2;
private static final byte TYPE_BOOLEAN = 3;
private static final byte TYPE_FLOAT = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_BYTES = 6;

/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);


/** Stores the concrete key/value pairs of this configuration object. */
protected final HashMap<String, Object> confData;

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -639,12 +639,16 @@ private Object getRawValueFromOption(ConfigOption<?> configOption) {
Object o = getRawValue(configOption.key());

if (o != null) {
// found a value for the current proper key
return o;
}
else if (configOption.hasDeprecatedKeys()) {
// try the deprecated keys
for (String deprecatedKey : configOption.deprecatedKeys()) {
Object oo = getRawValue(deprecatedKey);
if (oo != null) {
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
return oo;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.hdfstests;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class FsNegativeRunningJobsRegistryTest {

@ClassRule
public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();

private static MiniDFSCluster HDFS_CLUSTER;

private static Path HDFS_ROOT_PATH;

// ------------------------------------------------------------------------
// startup / shutdown
// ------------------------------------------------------------------------

@BeforeClass
public static void createHDFS() throws Exception {
final File tempDir = TEMP_DIR.newFolder();

Configuration hdConf = new Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());

MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
HDFS_CLUSTER = builder.build();

HDFS_ROOT_PATH = new Path("hdfs:https://" + HDFS_CLUSTER.getURI().getHost() + ":"
+ HDFS_CLUSTER.getNameNodePort() + "/");
}

@AfterClass
public static void destroyHDFS() {
if (HDFS_CLUSTER != null) {
HDFS_CLUSTER.shutdown();
}
HDFS_CLUSTER = null;
HDFS_ROOT_PATH = null;
}

// ------------------------------------------------------------------------
// Tests
// ------------------------------------------------------------------------

@Test
public void testCreateAndSetFinished() throws Exception {
final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
final JobID jid = new JobID();

FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);

// initially, without any call, the job is considered running
assertTrue(registry.isJobRunning(jid));

// repeated setting should not affect the status
registry.setJobRunning(jid);
assertTrue(registry.isJobRunning(jid));

// set the job to finished and validate
registry.setJobFinished(jid);
assertFalse(registry.isJobRunning(jid));

// another registry should pick this up
FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
assertFalse(otherRegistry.isJobRunning(jid));
}

@Test
public void testSetFinishedAndRunning() throws Exception {
final Path workDir = new Path(HDFS_ROOT_PATH, "änother_wörk_directörü");
final JobID jid = new JobID();

FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);

// set the job to finished and validate
registry.setJobFinished(jid);
assertFalse(registry.isJobRunning(jid));

// set the job to back to running and validate
registry.setJobRunning(jid);
assertTrue(registry.isJobRunning(jid));

// another registry should pick this up
FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
assertTrue(otherRegistry.isJobRunning(jid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.net.URI;
import java.net.UnknownHostException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
* class is a wrapper class which encapsulated the original Hadoop HDFS API.
Expand All @@ -60,7 +62,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst


/**
* Creates a new DistributedFileSystem object to access HDFS
* Creates a new DistributedFileSystem object to access HDFS, based on a class name
* and picking up the configuration from the class path or the Flink configuration.
*
* @throws IOException
* throw if the required HDFS classes cannot be instantiated
Expand All @@ -76,6 +79,21 @@ public HadoopFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass
this.fs = instantiateFileSystem(fsClass);
}

/**
* Creates a new DistributedFileSystem that uses the given Hadoop
* {@link org.apache.hadoop.fs.FileSystem} under the hood.
*
* @param hadoopConfig The Hadoop configuration that the FileSystem is based on.
* @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
*/
public HadoopFileSystem(
org.apache.hadoop.conf.Configuration hadoopConfig,
org.apache.hadoop.fs.FileSystem hadoopFileSystem) {

this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
}

private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public EmbeddedNonHaServices() {

// ------------------------------------------------------------------------

@Override
public String getResourceManagerEndpointName() {
// dynamic actor name
return null;
}

@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
Expand All @@ -55,11 +61,16 @@ public LeaderElectionService getResourceManagerLeaderElectionService() {
// ------------------------------------------------------------------------

@Override
public void shutdown() throws Exception {
public void close() throws Exception {
try {
super.shutdown();
super.close();
} finally {
resourceManagerLeaderService.shutdown();
}
}

@Override
public void closeAndCleanupAllData() throws Exception {
close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.highavailability;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

import java.io.FileNotFoundException;
import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This {@link RunningJobsRegistry} tracks the status jobs via marker files,
* marking finished jobs via marker files.
*
* <p>The general contract is the following:
* <ul>
* <li>Initially, a marker file does not exist (no one created it, yet), which means
* the specific job is assumed to be running</li>
* <li>The JobManager that finishes calls this service to create the marker file,
* which marks the job as finished.</li>
* <li>If a JobManager gains leadership at some point when shutdown is in progress,
* it will see the marker file and realize that the job is finished.</li>
* <li>The application framework is expected to clean the file once the application
* is completely shut down. At that point, no JobManager will attempt to
* start the job, even if it gains leadership.</li>
* </ul>
*
* <p>It is especially tailored towards deployment modes like for example
* YARN, where HDFS is available as a persistent file system, and the YARN
* application's working directories on HDFS are automatically cleaned
* up after the application completed.
*/
public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {

private static final String PREFIX = ".job_complete_";

private final FileSystem fileSystem;

private final Path basePath;

/**
* Creates a new registry that writes to the FileSystem and working directory
* denoted by the given path.
*
* <p>The initialization will attempt to write to the given working directory, in
* order to catch setup/configuration errors early.
*
* @param workingDirectory The working directory for files to track the job status.
*
* @throws IOException Thrown, if the specified directory cannot be accessed.
*/
public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException {
this(workingDirectory.getFileSystem(), workingDirectory);
}

/**
* Creates a new registry that writes its files to the given FileSystem at
* the given working directory path.
*
* <p>The initialization will attempt to write to the given working directory, in
* order to catch setup/configuration errors early.
*
* @param fileSystem The FileSystem to use for the marker files.
* @param workingDirectory The working directory for files to track the job status.
*
* @throws IOException Thrown, if the specified directory cannot be accessed.
*/
public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException {
this.fileSystem = checkNotNull(fileSystem, "fileSystem");
this.basePath = checkNotNull(workingDirectory, "workingDirectory");

// to be safe, attempt to write to the working directory, to
// catch problems early
final Path testFile = new Path(workingDirectory, ".registry_test");
try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
out.write(42);
}
catch (IOException e) {
throw new IOException("Unable to write to working directory: " + workingDirectory, e);
}
finally {
fileSystem.delete(testFile, false);
}
}

// ------------------------------------------------------------------------

@Override
public void setJobRunning(JobID jobID) throws IOException {
checkNotNull(jobID, "jobID");
final Path filePath = createMarkerFilePath(jobID);

// delete the marker file, if it exists
try {
fileSystem.delete(filePath, false);
}
catch (FileNotFoundException e) {
// apparently job was already considered running
}
}

@Override
public void setJobFinished(JobID jobID) throws IOException {
checkNotNull(jobID, "jobID");
final Path filePath = createMarkerFilePath(jobID);

// create the file
// to avoid an exception if the job already exists, set overwrite=true
try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
out.write(42);
}
}

@Override
public boolean isJobRunning(JobID jobID) throws IOException {
checkNotNull(jobID, "jobID");

// check for the existence of the file
try {
fileSystem.getFileStatus(createMarkerFilePath(jobID));
// file was found --> job is terminated
return false;
}
catch (FileNotFoundException e) {
// file does not exist, job is still running
return true;
}
}

private Path createMarkerFilePath(JobID jobId) {
return new Path(basePath, PREFIX + jobId.toString());
}
}
Loading

0 comments on commit 2a7dbda

Please sign in to comment.