Skip to content

Commit

Permalink
updating apache#640 to master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Shiti committed Oct 26, 2015
1 parent 8b05646 commit bd2b790
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void scheduleUsingBacktracking() {
final IntermediateResultPartition resultRequired = taskRequirement.getNextRequirement();

if (resultRequired.isLocationAvailable()) {
ActorRef taskManager = resultRequired.getLocation().getTaskManager();
ActorRef taskManager = resultRequired.getLocation().getActorGateway().actor();

LOG.debug("Requesting availability of IntermediateResultPartition " + resultRequired.getPartitionId());
// pin ResulPartition for this intermediate result
Expand Down Expand Up @@ -226,7 +226,7 @@ public void onComplete(Throwable failure, Object success) {
// continue with backtracking
scheduleUsingBacktracking();
}
}, AkkaUtils.globalExecutionContext());
}, task.getExecutionGraph().getExecutionContext());
/** END Asynchronous callback **/

// interrupt backtracking here and continue once future is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,9 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOExc
* @param parallelism
*/
public void setParallelism(int parallelism) {
for (AbstractJobVertex ejv : taskVertices.values()) {
for (JobVertex ejv : taskVertices.values()) {
ejv.setParallelism(parallelism);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,6 @@ class JobManager(

Future {
val result = graph.updateState(taskExecutionState)
originalSender ! result
val result = executionGraph.updateState(taskExecutionState)
originalSender ! decorateMessage(result)
}(context.dispatcher)

Expand Down Expand Up @@ -585,7 +583,7 @@ class JobManager(
self ! decorateMessage(RemoveJob(jobID, true))
}

case msg: BarrierAck =>
/*case msg: BarrierAck =>
currentJobs.get(jobId) match {
case Some((executionGraph, jobInfo)) if executionGraph.getJobID == msg.jobID =>
executionGraph.getStateCheckpointerActor forward msg
Expand All @@ -596,7 +594,7 @@ class JobManager(
case Some((executionGraph, jobInfo)) if executionGraph.getJobID == msg.jobID =>
executionGraph.getStateCheckpointerActor forward msg
case None =>
}
}*/

case ScheduleOrUpdateConsumers(jobId, partitionId) =>
currentJobs.get(jobId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.JavaTestKit;
import org.apache.commons.math.stat.inference.TestUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
Expand All @@ -32,24 +33,28 @@
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.*;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import scala.Option;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -108,33 +113,33 @@ public void onReceive(Object msg) throws Exception {
}

@Before
public void setup(){
public void setup() {
system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
TestingUtils.setCallingThreadDispatcher(system);
// TestingUtils.setCallingThreadDispatcher(system);
}

@After
public void teardown(){
TestingUtils.setGlobalExecutionContext();
public void teardown() {
// TestingUtils.setGlobalExecutionContext();
JavaTestKit.shutdownActorSystem(system);
}

private AbstractJobVertex createNode(String name) {
AbstractJobVertex abstractJobVertex = new AbstractJobVertex(name);
abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return abstractJobVertex;
private JobVertex createNode(String name) {
JobVertex JobVertex = new JobVertex(name);
JobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return JobVertex;
}

private AbstractJobVertex createOutputNode(String name) {
AbstractJobVertex abstractJobVertex = new OutputFormatVertex(name);
abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return abstractJobVertex;
private JobVertex createOutputNode(String name) {
JobVertex JobVertex = new OutputFormatVertex(name);
JobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return JobVertex;
}

private AbstractJobVertex createInputNode(String name) {
AbstractJobVertex abstractJobVertex = new InputFormatVertex(name);
abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return abstractJobVertex;
private JobVertex createInputNode(String name) {
JobVertex JobVertex = new InputFormatVertex(name);
JobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
return JobVertex;
}

@Test
Expand All @@ -144,10 +149,10 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
final Configuration cfg = new Configuration();

// JobVertex which has intermediate results available
final AbstractJobVertex resumePoint;
final JobVertex resumePoint;

/*
sink1 sink2
sink1 sink2
O O
^ ^
´ ` ´ `
Expand All @@ -171,19 +176,19 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
*/

// topologically sorted list
final List<AbstractJobVertex> list = new ArrayList<AbstractJobVertex>();
final List<JobVertex> list = new ArrayList<JobVertex>();

final AbstractJobVertex source = createInputNode("source1");
final JobVertex source = createInputNode("source1");
list.add(source);

AbstractJobVertex node1 = createOutputNode("sink1");
JobVertex node1 = createOutputNode("sink1");
{
AbstractJobVertex child1 = createNode("sink1-child1");
AbstractJobVertex child2 = createNode("sink1-child2");
JobVertex child1 = createNode("sink1-child1");
JobVertex child2 = createNode("sink1-child2");
node1.connectNewDataSetAsInput(child1, DistributionPattern.ALL_TO_ALL);
node1.connectNewDataSetAsInput(child2, DistributionPattern.ALL_TO_ALL);

AbstractJobVertex child1child2child = createNode("sink1-child1-child2-child");
JobVertex child1child2child = createNode("sink1-child1-child2-child");
child1.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL);
child2.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL);

Expand All @@ -194,14 +199,14 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
list.add(child2);
}

AbstractJobVertex node2 = createOutputNode("sink2");
final AbstractJobVertex child1 = createNode("sink2-child1");
final AbstractJobVertex child2 = createNode("sink2-child2");
JobVertex node2 = createOutputNode("sink2");
final JobVertex child1 = createNode("sink2-child1");
final JobVertex child2 = createNode("sink2-child2");
node2.connectNewDataSetAsInput(child1, DistributionPattern.ALL_TO_ALL);
node2.connectNewDataSetAsInput(child2, DistributionPattern.ALL_TO_ALL);

// resume from this node
AbstractJobVertex child1child2child = createNode("sink1-child1-child2-child");
JobVertex child1child2child = createNode("sink1-child1-child2-child");
resumePoint = child1child2child;

child1.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL);
Expand All @@ -216,13 +221,22 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
list.add(node1);
list.add(node2);

final ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
final ExecutionGraph eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
null,
null,
getClass().getClassLoader());

new JavaTestKit(system) {
{

final Props props = Props.create(TestTaskManager.class, getRef());
final ActorRef taskManagerActor = system.actorOf(props);
Option<String> empty = Option.empty();
ActorGateway actorGateway = TestingUtils.createForwardingJobManager(system, taskManagerActor, empty);

eg.setScheduleMode(ScheduleMode.BACKTRACKING);

Expand All @@ -243,7 +257,7 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
// mock an instance
Instance mockInstance = Mockito.mock(Instance.class);
Mockito.when(mockInstance.isAlive()).thenReturn(true);
Mockito.when(mockInstance.getTaskManager()).thenReturn(taskManagerActor);
Mockito.when(mockInstance.getActorGateway()).thenReturn(actorGateway);
InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo();
Mockito.when(mockInstance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo);
// set the mock as a location
Expand All @@ -262,11 +276,11 @@ public void testBacktrackingIntermediateResults() throws InstanceDiedException,
schedulePoints.add(child2.getID());
schedulePoints.add(source.getID());

final Scheduler scheduler = new Scheduler();//Mockito.mock(Scheduler.class);
final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());//Mockito.mock(Scheduler.class);

Instance i1 = null;
try {
i1 = ExecutionGraphTestUtils.getInstance(taskManagerActor, 10);
i1 = ExecutionGraphTestUtils.getInstance(actorGateway, 10);
} catch (Exception e) {
e.printStackTrace();
fail("Couldn't get instance: " + e.getMessage());
Expand Down Expand Up @@ -318,18 +332,18 @@ public void testMassiveExecutionGraphExecutionVertexScheduling() {

//Random rand = new Random(System.currentTimeMillis());

LinkedList<AbstractJobVertex> allNodes = new LinkedList<AbstractJobVertex>();
LinkedList<JobVertex> allNodes = new LinkedList<JobVertex>();

for (int s = 0; s < numSinks; s++) {
AbstractJobVertex node = new OutputFormatVertex("sink" + s);
JobVertex node = new OutputFormatVertex("sink" + s);
node.setInvokableClass(Tasks.NoOpInvokable.class);

//node.setParallelism(rand.nextInt(maxParallelism) + 1);
node.setParallelism(parallelism);
allNodes.addLast(node);

for (int i = 0; i < depth; i++) {
AbstractJobVertex other = new AbstractJobVertex("vertex" + i + " sink" + s);
JobVertex other = new JobVertex("vertex" + i + " sink" + s);
other.setParallelism(parallelism);
other.setInvokableClass(Tasks.NoOpInvokable.class);
node.connectNewDataSetAsInput(other, DistributionPattern.ALL_TO_ALL);
Expand All @@ -339,7 +353,14 @@ public void testMassiveExecutionGraphExecutionVertexScheduling() {

}

final ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
final ExecutionGraph eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
null,
null,
ClassLoader.getSystemClassLoader());

eg.setScheduleMode(ScheduleMode.BACKTRACKING);

Expand All @@ -355,13 +376,14 @@ public void testMassiveExecutionGraphExecutionVertexScheduling() {

final Props props = Props.create(TestTaskManager.class, getRef());
final ActorRef taskManagerActor = system.actorOf(props);
Option<String> empty = Option.empty();
ActorGateway actorGateway = TestingUtils.createForwardingJobManager(system, taskManagerActor, empty);


Scheduler scheduler = new Scheduler();
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());

Instance i1 = null;
try {
i1 = ExecutionGraphTestUtils.getInstance(taskManagerActor, numSinks * parallelism);
i1 = ExecutionGraphTestUtils.getInstance(actorGateway, numSinks * parallelism);
} catch (Exception e) {
e.printStackTrace();
}
Expand All @@ -375,7 +397,7 @@ public void testMassiveExecutionGraphExecutionVertexScheduling() {
fail("Failed to schedule ExecutionGraph");
}

for (int i=0; i < numSinks * parallelism; i++) {
for (int i = 0; i < numSinks * parallelism; i++) {
// all sources should be scheduled
expectMsgClass(duration("1 second"), TaskMessages.SubmitTask.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,16 @@ public void testPartitionReject() {
new JavaTestKit(system){{

ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class));
ActorRef taskManager = createTaskManager(jobManager, true);
ActorGateway taskManager =TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
true);

// send a non-existing partition id to the task manager
IntermediateResultPartitionID partitionID = new IntermediateResultPartitionID();
taskManager.tell(
taskManager.actor().tell(
new TaskMessages.LockResultPartition(partitionID, 1),
getRef());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,6 @@ import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import akka.actor.{Cancellable, Terminated, ActorRef, Props}
import akka.pattern.{ask, pipe}
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged,
JobStatusChanged}
import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers
import org.apache.flink.runtime.messages.Messages.Disconnect
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Loading

0 comments on commit bd2b790

Please sign in to comment.