Skip to content

Commit

Permalink
[FLINK-8705] [flip6] Add DispatcherRestEndpoint to MiniCluster
Browse files Browse the repository at this point in the history
In order to properly support the RemoteEnvironment, the Flip-6 MiniCluster
needs a REST endpoint to receive requests from the RestClusterClient.

This closes apache#5527.
  • Loading branch information
tillrohrmann committed Feb 21, 2018
1 parent facf2ac commit 2a18f05
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 719 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
Expand Down Expand Up @@ -125,14 +125,11 @@ public void start() throws Exception {
private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {

configuration.setInteger(RestOptions.REST_PORT, 0);

final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumJobManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER))
.setNumResourceManagers(
configuration.getInteger(ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER))
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
Expand All @@ -146,6 +143,8 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());

newJobExecutorService = miniCluster;
} else {
final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
Expand All @@ -161,7 +160,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
public void stop() throws Exception {
synchronized (lock) {
if (jobExecutorService != null) {
jobExecutorService.terminate().get();
jobExecutorService.close();
jobExecutorService = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand Down Expand Up @@ -112,6 +113,7 @@ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,

clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.client.program.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
Expand Down Expand Up @@ -78,7 +76,6 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.CheckedSupplier;

import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -219,20 +216,11 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad
throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e));
}

if (jobResult.getSerializedThrowable().isPresent()) {
final SerializedThrowable serializedThrowable = jobResult.getSerializedThrowable().get();
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new ProgramInvocationException(throwable);
}

try {
this.lastJobExecutionResult = new JobExecutionResult(
jobResult.getJobId(),
jobResult.getNetRuntime(),
AccumulatorHelper.deserializeAccumulators(
jobResult.getAccumulatorResults(),
classLoader));
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobResult.WrappedJobException we) {
throw new ProgramInvocationException(we.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.flink.runtime.jobmaster;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +102,29 @@ public Optional<SerializedThrowable> getSerializedThrowable() {
return Optional.ofNullable(serializedThrowable);
}

/**
* Converts the {@link JobResult} to a {@link JobExecutionResult}.
*
* @param classLoader to use for deserialization
* @return JobExecutionResult
* @throws WrappedJobException if the JobResult contains a serialized exception
* @throws IOException if the accumulator could not be deserialized
* @throws ClassNotFoundException if the accumulator could not deserialized
*/
public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
if (serializedThrowable != null) {
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new WrappedJobException(throwable);
}

return new JobExecutionResult(
jobId,
netRuntime,
AccumulatorHelper.deserializeAccumulators(
accumulatorResults,
classLoader));
}

/**
* Builder for {@link JobResult}.
*/
Expand Down Expand Up @@ -175,4 +202,16 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
return builder.build();
}

/**
* Exception which indicates that the job has finished with an {@link Exception}.
*/
public static final class WrappedJobException extends FlinkException {

private static final long serialVersionUID = 6535061898650156019L;

public WrappedJobException(Throwable cause) {
super(cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@

package org.apache.flink.runtime.minicluster;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.util.AutoCloseableAsync;

/**
* Interface to control {@link JobExecutor}.
*/
public interface JobExecutorService extends JobExecutor {

/**
* Terminate the given JobExecutorService.
*
* <p>This method can be implemented asynchronously. Therefore it returns a future
* which is completed once the termination has been done.
*
* @return Termination future which can also contain an exception if the termination went wrong
*/
CompletableFuture<?> terminate();
public interface JobExecutorService extends JobExecutor, AutoCloseableAsync {
}
Loading

0 comments on commit 2a18f05

Please sign in to comment.