Skip to content

Commit

Permalink
[FLINK-1478] [jobmanager] Add deterministic strictly local split assi…
Browse files Browse the repository at this point in the history
…gnment (part 2)

This closes apache#375
  • Loading branch information
StephanEwen committed Feb 9, 2015
1 parent 6fcef7d commit 4386620
Show file tree
Hide file tree
Showing 3 changed files with 677 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class ExecutionJobVertex implements Serializable {

private final InputSplit[] inputSplits;

private List<InputSplit>[] inputSplitsPerSubtask;
private List<LocatableInputSplit>[] inputSplitsPerSubtask;

private InputSplitAssigner splitAssigner;

Expand Down Expand Up @@ -139,134 +140,48 @@ public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
try {
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();

if (splitSource != null) {
this.inputSplits = splitSource.createInputSplits(numTaskVertices);
inputSplits = splitSource.createInputSplits(numTaskVertices);

if (splitSource instanceof StrictlyLocalAssignment) {

// group the splits by host while preserving order per host
Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>();
for(int i=0; i<this.inputSplits.length; i++) {

// check that split has exactly one local host
LocatableInputSplit lis;
InputSplit is = this.inputSplits[i];
if(!(is instanceof LocatableInputSplit)) {
new JobException("Invalid InputSplit type "+is.getClass().getCanonicalName()+". " +
"Strictly local assignment requires LocatableInputSplit");
}
lis = (LocatableInputSplit) is;

if(lis.getHostnames() == null) {
throw new JobException("LocatableInputSplit has no host information. " +
"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
} else if (lis.getHostnames().length != 1) {
throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
}
String hostName = lis.getHostnames()[0];

List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
if(hostSplits == null) {
hostSplits = new ArrayList<LocatableInputSplit>();
splitsByHost.put(hostName, hostSplits);
}
hostSplits.add(lis);
}

// assign subtasks to hosts
// get list of hosts in deterministic order
List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
Collections.sort(hosts);
int numSubTasks = this.getParallelism();
int numHosts = hosts.size();
if(numSubTasks < numHosts) {
throw new JobException("Strictly local split assignment requires at least as " +
"many parallel subtasks as distinct split hosts. Please increase the parallelism " +
"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
}

int numSubTasksPerHost = numSubTasks / numHosts;
int numHostWithOneMore = numSubTasks % numHosts;

Map<String, int[]> subTaskHostAssignment = new HashMap<String, int[]>(numHosts);
int assignedHostsCnt = 0;
int assignedTasksCnt = 0;
for(String host : hosts) {
int numTasksToAssign = assignedHostsCnt < numHostWithOneMore ? numSubTasksPerHost + 1 : numSubTasksPerHost;
int[] subTasks = new int[numTasksToAssign];
for(int i=0; i<numTasksToAssign; i++) {
subTasks[i] = assignedTasksCnt++;
}
subTaskHostAssignment.put(host, subTasks);
assignedHostsCnt++;
if (inputSplits != null) {
if (splitSource instanceof StrictlyLocalAssignment) {
inputSplitsPerSubtask = computeLocalInputSplitsPerTask(inputSplits);
splitAssigner = new PredeterminedInputSplitAssigner(inputSplitsPerSubtask);
} else {
splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
}

// attach locality constraint to subtask
for(String host : hosts) {
int[] subTasks = subTaskHostAssignment.get(host);

for(int taskId : subTasks) {
this.getTaskVertices()[taskId].setTargetHostConstraint(host);
}
}

// assign splits to subtasks
this.inputSplitsPerSubtask = (List<InputSplit>[])new List[numSubTasks];
for(String host : hosts) {
List<LocatableInputSplit> localSplits = splitsByHost.get(host);
int[] localSubTasks = subTaskHostAssignment.get(host);

// init lists
for(int i=0; i<localSubTasks.length; i++) {
this.inputSplitsPerSubtask[localSubTasks[i]] = new ArrayList<InputSplit>();
}

int subTaskIdx = 0;
while(!localSplits.isEmpty()) {
int subTask = localSubTasks[subTaskIdx++];
this.inputSplitsPerSubtask[subTask].add(localSplits.remove(localSplits.size() - 1));
if(subTaskIdx == localSubTasks.length) {
subTaskIdx = 0;
}
}
}

// create predetermined split assigner
this.splitAssigner = new PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask);

} else {
this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
}
} else {
this.inputSplits = null;
this.splitAssigner = null;
}
else {
inputSplits = null;
}
}
catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
}

this.finishedSubtasks = new boolean[parallelism];
finishedSubtasks = new boolean[parallelism];
}

public ExecutionGraph getGraph() {
return this.graph;
return graph;
}

public AbstractJobVertex getJobVertex() {
return this.jobVertex;
return jobVertex;
}

public int getParallelism() {
return this.parallelism;
return parallelism;
}

public JobID getJobId() {
return this.graph.getJobID();
return graph.getJobID();
}

public JobVertexID getJobVertexId() {
return this.jobVertex.getID();
return jobVertex.getID();
}

public ExecutionVertex[] getTaskVertices() {
Expand Down Expand Up @@ -345,15 +260,49 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>

public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {

// ExecutionVertex[] vertices = this.taskVertices;
//
// for (int i = 0; i < vertices.length; i++) {
// ExecutionVertex v = vertices[i];
//
// if (v.get
// }
ExecutionVertex[] vertices = this.taskVertices;

for (ExecutionVertex ev : getTaskVertices()) {
// check if we need to do pre-assignment of tasks
if (inputSplitsPerSubtask != null) {

final Map<String, List<Instance>> instances = scheduler.getInstancesByHost();
final Map<String, Integer> assignments = new HashMap<String, Integer>();

for (int i = 0; i < vertices.length; i++) {
List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i];
if (splitsForHost == null || splitsForHost.isEmpty()) {
continue;
}

String[] hostNames = splitsForHost.get(0).getHostnames();
if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) {
continue;
}

String host = hostNames[0];
ExecutionVertex v = vertices[i];

List<Instance> instancesOnHost = instances.get(host);

if (instancesOnHost == null || instancesOnHost.isEmpty()) {
throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host
+ ". No TaskManager available on that host.");
}

Integer pos = assignments.get(host);
if (pos == null) {
pos = 0;
assignments.put(host, 0);
} else {
assignments.put(host, pos + 1 % instancesOnHost.size());
}

v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
}
}

// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(scheduler, queued);
}
}
Expand Down Expand Up @@ -504,30 +453,132 @@ private void subtaskInFinalState(int subtask) {
}
}

private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {

final int numSubTasks = getParallelism();

// sanity check
if (numSubTasks > splits.length) {
throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
}

// group the splits by host while preserving order per host
Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>();

for (InputSplit split : splits) {
// check that split has exactly one local host
if(!(split instanceof LocatableInputSplit)) {
new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
"Strictly local assignment requires LocatableInputSplit");
}
LocatableInputSplit lis = (LocatableInputSplit) split;

if (lis.getHostnames() == null) {
throw new JobException("LocatableInputSplit has no host information. " +
"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
}
else if (lis.getHostnames().length != 1) {
throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
}
String hostName = lis.getHostnames()[0];

List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
if (hostSplits == null) {
hostSplits = new ArrayList<LocatableInputSplit>();
splitsByHost.put(hostName, hostSplits);
}
hostSplits.add(lis);
}


int numHosts = splitsByHost.size();

if (numSubTasks < numHosts) {
throw new JobException("Strictly local split assignment requires at least as " +
"many parallel subtasks as distinct split hosts. Please increase the parallelism " +
"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
}

// get list of hosts in deterministic order
List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
Collections.sort(hosts);

@SuppressWarnings("unchecked")
List<LocatableInputSplit>[] subTaskSplitAssignment = (List<LocatableInputSplit>[]) new List<?>[numSubTasks];

final int subtasksPerHost = numSubTasks / numHosts;
final int hostsWithOneMore = numSubTasks % numHosts;

int subtaskNum = 0;

// we go over all hosts and distribute the hosts' input splits
// over the subtasks
for (int hostNum = 0; hostNum < numHosts; hostNum++) {
String host = hosts.get(hostNum);
List<LocatableInputSplit> splitsOnHost = splitsByHost.get(host);

int numSplitsOnHost = splitsOnHost.size();

// the number of subtasks to split this over.
// NOTE: if the host has few splits, some subtasks will not get anything.
int subtasks = Math.min(numSplitsOnHost,
hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);

int splitsPerSubtask = numSplitsOnHost / subtasks;
int subtasksWithOneMore = numSplitsOnHost % subtasks;

int splitnum = 0;

// go over the subtasks and grab a subrange of the input splits
for (int i = 0; i < subtasks; i++) {
int numSplitsForSubtask = (i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask);

List<LocatableInputSplit> splitList;

if (numSplitsForSubtask == numSplitsOnHost) {
splitList = splitsOnHost;
}
else {
splitList = new ArrayList<LocatableInputSplit>(numSplitsForSubtask);
for (int k = 0; k < numSplitsForSubtask; k++) {
splitList.add(splitsOnHost.get(splitnum++));
}
}

subTaskSplitAssignment[subtaskNum++] = splitList;
}
}

return subTaskSplitAssignment;
}

//---------------------------------------------------------------------------------------------
// Predetermined InputSplitAssigner
//---------------------------------------------------------------------------------------------

public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {

private List<InputSplit>[] inputSplitsPerSubtask;
private List<LocatableInputSplit>[] inputSplitsPerSubtask;

public PredeterminedInputSplitAssigner(List<InputSplit>[] inputSplitsPerSubtask) {
@SuppressWarnings("unchecked")
public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
// copy input split assignment
this.inputSplitsPerSubtask = (List<InputSplit>[])new List[inputSplitsPerSubtask.length];
for(int i=0; i<inputSplitsPerSubtask.length; i++) {
this.inputSplitsPerSubtask[i] = new ArrayList<InputSplit>(inputSplitsPerSubtask[i].size());
this.inputSplitsPerSubtask[i].addAll(inputSplitsPerSubtask[i]);
this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List[inputSplitsPerSubtask.length];
for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
List<LocatableInputSplit> next = inputSplitsPerSubtask[i];

this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ?
Collections.<LocatableInputSplit>emptyList() :
new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
}
}

@Override
public InputSplit getNextInputSplit(String host, int taskId) {
if(inputSplitsPerSubtask[taskId].isEmpty()) {
if (inputSplitsPerSubtask[taskId].isEmpty()) {
return null;
} else {
InputSplit is = inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
return is;
return inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
}
}
}
Expand Down
Loading

0 comments on commit 4386620

Please sign in to comment.