Skip to content

Commit

Permalink
[FLINK-4689] [cluster management] Implement a simple slot provider fo…
Browse files Browse the repository at this point in the history
…r the new job manager
  • Loading branch information
KurtYoung authored and StephanEwen committed Dec 23, 2016
1 parent 106cb9e commit 5cbec02
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ Future<SimpleSlot> allocateSimpleSlot(

internalAllocateSlot(jobID, allocationID, resourceProfile, future);

final SlotOwner owner = this;
return future.thenApplyAsync(
new ApplyFunction<SlotDescriptor, SimpleSlot>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager.slots;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A simple pool based slot provider with {@link SlotPool} as the underlying storage.
*/
public class PooledSlotProvider implements SlotProvider {

/** The pool which holds all the slots. */
private final SlotPool slotPool;

/** The timeout for allocation. */
private final Time timeout;

public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
this.slotPool = slotPool;
this.timeout = timeout;
}

@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
boolean allowQueued) throws NoResourceAvailableException
{
checkNotNull(task);

final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
try {
final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
return FlinkCompletableFuture.completed(slot);
} catch (InterruptedException e) {
throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
} catch (ExecutionException e) {
throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
"during allocation, " + e.getMessage());
} catch (TimeoutException e) {
throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand All @@ -84,7 +85,6 @@
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;

import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand All @@ -93,6 +93,7 @@
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** The execution graph of this job */
private final ExecutionGraph executionGraph;

private final SlotPool slotPool;

private final Time allocationTimeout;

private volatile UUID leaderSessionID;

Expand All @@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
private ResourceManagerConnection resourceManagerConnection;

// TODO - we need to replace this with the slot pool
private final Scheduler scheduler;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -240,8 +242,8 @@ public JobMaster(
-1,
log);

// TODO - temp fix
this.scheduler = new Scheduler(executorService);
this.slotPool = new SlotPool(executorService);
this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
}

//----------------------------------------------------------------------------------------------
Expand All @@ -263,6 +265,7 @@ public void start(final UUID leaderSessionID) {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
super.start();

slotPool.setJobManagerLeaderId(leaderSessionID);
log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
getSelf().startJobExecution();
} else {
Expand Down Expand Up @@ -338,7 +341,7 @@ public void run() {
@Override
public void run() {
try {
executionGraph.scheduleForExecution(scheduler);
executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
} catch (Throwable t) {
executionGraph.fail(t);
}
Expand Down Expand Up @@ -366,6 +369,7 @@ public void suspendExecution(final Throwable cause) {
((StartStoppable) getSelf()).stop();

leaderSessionID = null;
slotPool.setJobManagerLeaderId(null);
executionGraph.suspend(cause);

// disconnect from resource manager:
Expand Down Expand Up @@ -777,9 +781,12 @@ public void run() {
// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
// verify the response with current connection
if (resourceManagerConnection != null
&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
{
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
success.getResourceManagerLeaderId());
slotPool.setResourceManager(success.getResourceManagerLeaderId(),
resourceManagerConnection.getTargetGateway());
}
}
});
Expand All @@ -790,6 +797,7 @@ private void closeResourceManagerConnection() {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
slotPool.disconnectResourceManager();
}

//----------------------------------------------------------------------------------------------
Expand Down

0 comments on commit 5cbec02

Please sign in to comment.