Skip to content

Commit

Permalink
Implemented resource check during task deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 12, 2012
1 parent aadf6ce commit 5515b02
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
*
* @author warneke
*/
public interface Environment
{
public interface Environment {
/**
* Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
* required
Expand Down Expand Up @@ -148,16 +147,29 @@ public interface Environment
*/
int getNumberOfInputGates();

/**
* Returns the number of output channels attached to this environment.
*
* @return the number of output channels attached to this environment
*/
int getNumberOfOutputChannels();

/**
* Returns the number of input channels attached to this environment.
*
* @return the number of input channels attached to this environment
*/
int getNumberOfInputChannels();

/**
* Creates an output gate.
*
* @param gateID
* @param outputClass
* @param selector
* @param isBroadcast
*
* @param <T> The type of the record consumed by the output gate.
*
* @param <T>
* The type of the record consumed by the output gate.
* @return The created output gate.
*/
<T extends Record> OutputGate<T> createOutputGate(GateID gateID, Class<T> outputClass,
Expand All @@ -169,9 +181,8 @@ <T extends Record> OutputGate<T> createOutputGate(GateID gateID, Class<T> output
* @param gateID
* @param deserializer
* @param distributionPattern
*
* @param <T> The type of the record read from the input gate.
*
* @param <T>
* The type of the record read from the input gate.
* @return The created input gate.
*/
<T extends Record> InputGate<T> createInputGate(GateID gateID, RecordDeserializerFactory<T> deserializerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.nephele.types.Record;

final class CheckpointEnvironment implements Environment
{
final class CheckpointEnvironment implements Environment {
private final ExecutionVertexID vertexID;

private final Environment environment;
Expand Down Expand Up @@ -209,13 +208,31 @@ public int getNumberOfInputGates() {
return this.environment.getNumberOfInputGates();
}

/**
* {@inheritDoc}
*/
@Override
public int getNumberOfOutputChannels() {

return this.environment.getNumberOfOutputChannels();
}

/**
* {@inheritDoc}
*/
@Override
public int getNumberOfInputChannels() {

return this.environment.getNumberOfInputChannels();
}

/**
* {@inheritDoc}
*/
@Override
public <T extends Record> OutputGate<T> createOutputGate(final GateID gateID,
final Class<T> outputClass, final ChannelSelector<T> selector, final boolean isBroadcast)
{
final Class<T> outputClass, final ChannelSelector<T> selector, final boolean isBroadcast) {

throw new IllegalStateException("Checkpoint replay task called createOutputGate");
}

Expand All @@ -224,8 +241,8 @@ public <T extends Record> OutputGate<T> createOutputGate(final GateID gateID,
*/
@Override
public <T extends Record> InputGate<T> createInputGate(final GateID gateID,
final RecordDeserializerFactory<T> deserializer)
{
final RecordDeserializerFactory<T> deserializer) {

throw new IllegalStateException("Checkpoint replay task called createInputGate");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,34 @@ public int getNumberOfInputGates() {
return this.inputGates.size();
}

/**
* {@inheritDoc}
*/
@Override
public int getNumberOfOutputChannels() {

int numberOfOutputChannels = 0;
for (int i = 0; i < this.outputGates.size(); ++i) {
numberOfOutputChannels += this.outputGates.get(i).getNumberOfOutputChannels();
}

return numberOfOutputChannels;
}

/**
* {@inheritDoc}
*/
@Override
public int getNumberOfInputChannels() {

int numberOfInputChannels = 0;
for (int i = 0; i < this.inputGates.size(); ++i) {
numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
}

return numberOfInputChannels;
}

/**
* Returns the registered input gate with index <code>pos</code>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public abstract class AbstractTaskResult implements IOReadableWritable {

public enum ReturnCode {
SUCCESS, DEPLOYMENT_ERROR, IPC_ERROR, NO_INSTANCE, ILLEGAL_STATE, TASK_NOT_FOUND
SUCCESS, DEPLOYMENT_ERROR, IPC_ERROR, NO_INSTANCE, ILLEGAL_STATE, TASK_NOT_FOUND, INSUFFICIENT_RESOURCES
};

private ExecutionVertexID vertexID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager;
import eu.stratosphere.nephele.taskmanager.bytebuffered.InsufficientResourcesException;
import eu.stratosphere.nephele.taskmanager.runtime.EnvelopeConsumptionLog;
import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
import eu.stratosphere.nephele.util.SerializableArrayList;
Expand Down Expand Up @@ -540,18 +541,30 @@ public List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescripto
final Set<ChannelID> activeOutputChannels = null; // TODO: Fix me

// Register the task
final Task task = createAndRegisterTask(vertexID, jobConfiguration, re, initialCheckpointState,
activeOutputChannels);
Task task;
try {
task = createAndRegisterTask(vertexID, jobConfiguration, re, initialCheckpointState,
activeOutputChannels);
} catch (InsufficientResourcesException e) {
final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
result.setDescription(e.getMessage());
LOG.error(result.getDescription());
submissionResultList.add(result);
continue;
}

if (task == null) {
final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
result.setDescription("Task " + re.getTaskNameWithIndex() + " (" + vertexID + ") was already running");
LOG.error(result.getDescription());
submissionResultList.add(result);
} else {
submissionResultList.add(new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.SUCCESS));
tasksToStart.add(task);
continue;
}

submissionResultList.add(new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.SUCCESS));
tasksToStart.add(task);
}

// Now start the tasks
Expand Down Expand Up @@ -579,7 +592,7 @@ public List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescripto
*/
private Task createAndRegisterTask(final ExecutionVertexID id, final Configuration jobConfiguration,
final RuntimeEnvironment environment, final CheckpointState initialCheckpointState,
final Set<ChannelID> activeOutputChannels) throws IOException {
final Set<ChannelID> activeOutputChannels) throws InsufficientResourcesException, IOException {

if (id == null) {
throw new IllegalArgumentException("Argument id is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public final class ByteBufferedChannelManager implements TransferEnvelopeDispatc

private static final boolean DEFAULT_MERGE_SPILLED_BUFFERS = true;

// TODO: Make this configurable
private static final int NUMBER_OF_CHANNELS_FOR_MULTICAST = 10;

private final Map<ChannelID, ChannelContext> registeredChannels = new ConcurrentHashMap<ChannelID, ChannelContext>();

private final Map<AbstractID, LocalBufferPoolOwner> localBufferPoolOwner = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
Expand Down Expand Up @@ -119,8 +122,14 @@ public ByteBufferedChannelManager(final ChannelLookupProtocol channelLookupServi
* the task to be registered
* @param the
* set of output channels which are initially active
* @throws InsufficientResourcesException
* thrown if the channel manager does not have enough memory buffers to safely run this task
*/
public void register(final Task task, final Set<ChannelID> activeOutputChannels) {
public void register(final Task task, final Set<ChannelID> activeOutputChannels)
throws InsufficientResourcesException {

// Check if we can safely run this task with the given resources
checkBufferAvailability(task);

final Environment environment = task.getEnvironment();

Expand Down Expand Up @@ -738,14 +747,54 @@ public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourc
return this.transitBufferPool;
}

private void redistributeGlobalBuffers() {
/**
* Checks if the byte buffered channel manager has enough resources available to safely execute the given task.
*
* @param task
* the task to be executed
* @throws InsufficientResourcesException
* thrown if the byte buffered manager currently does not have enough resources available to execute the
* task
*/
private void checkBufferAvailability(final Task task) throws InsufficientResourcesException {

final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
int numberOfAlreadyRegisteredChannels = this.registeredChannels.size();
if (this.multicastEnabled) {
numberOfAlreadyRegisteredChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
}

final Environment env = task.getEnvironment();

final int numberOfChannelsForMulticast = 10; // TODO: Make this configurable
final int numberOfNewChannels = env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
final int totalNumberOfChannels = numberOfAlreadyRegisteredChannels + numberOfNewChannels;

final double buffersPerChannel = (double) totalNumberOfBuffers
/ (double) totalNumberOfChannels;

if (buffersPerChannel < 1.0) {

// Construct error message
final StringBuilder sb = new StringBuilder(this.localConnectionInfo.getHostName());
sb.append(" has not enough buffers available to safely execute ");
sb.append(env.getTaskName());
sb.append(" (");
sb.append(totalNumberOfChannels - totalNumberOfBuffers);
sb.append(" buffers are currently missing)");

throw new InsufficientResourcesException(sb.toString());
}
}

/**
* Redistributes the global buffers among the registered tasks.
*/
private void redistributeGlobalBuffers() {

final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
int totalNumberOfChannels = this.registeredChannels.size();
if (this.multicastEnabled) {
totalNumberOfChannels += numberOfChannelsForMulticast;
totalNumberOfChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
}
final double buffersPerChannel = (double) totalNumberOfBuffers / (double) totalNumberOfChannels;
if (buffersPerChannel < 1.0) {
Expand All @@ -769,7 +818,7 @@ private void redistributeGlobalBuffers() {

if (this.multicastEnabled) {
this.transitBufferPool.setDesignatedNumberOfBuffers((int) Math.ceil(buffersPerChannel
* numberOfChannelsForMulticast));
* NUMBER_OF_CHANNELS_FOR_MULTICAST));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.taskmanager.bytebuffered;

/**
* This exception is thrown by the {@link ByteBufferedChannelManager} to indicate that a task cannot be accepted because
* there are not enough resources available to safely execute it.
*
* @author warneke
*/
public final class InsufficientResourcesException extends Exception {

/**
* The generated serial version UID.
*/
private static final long serialVersionUID = -8977049569413215169L;

/**
* Constructs a new insufficient resources exception.
*
* @param msg
* the message describing the exception
*/
InsufficientResourcesException(final String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package eu.stratosphere.nephele.jobmanager;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.BufferedReader;
Expand Down Expand Up @@ -1079,13 +1078,11 @@ public void testExecutionWithLargeDoP() {
try {
jobClient.submitJobAndWait();
} catch (JobExecutionException e) {
fail(e.getMessage());
// Job execution should lead to an error due to lack of resources
return;
}

// Finally, make sure Nephele created a directory as output
assertTrue(outputFile.isDirectory());

// Make s
fail("Undetected lack of resources");

} catch (JobGraphDefinitionException jgde) {
fail(jgde.getMessage());
Expand Down

0 comments on commit 5515b02

Please sign in to comment.