Skip to content

Commit

Permalink
[FLINK-5114] [network] Handle partition producer state check for unre…
Browse files Browse the repository at this point in the history
…gistered executions

This closes apache#2912.
  • Loading branch information
uce committed Dec 12, 2016
1 parent 47db9cb commit a078666
Show file tree
Hide file tree
Showing 25 changed files with 623 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -34,6 +36,14 @@ public class IntermediateResult {

private final IntermediateResultPartition[] partitions;

/**
* Maps intermediate result partition IDs to a partition index. This is
* used for ID lookups of intermediate results. I didn't dare to change the
* partition connect logic in other places that is tightly coupled to the
* partitions being held as an array.
*/
private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();

private final int numParallelProducers;

private final AtomicInteger numberOfRunningProducers;
Expand All @@ -54,10 +64,12 @@ public IntermediateResult(

this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
this.partitions = new IntermediateResultPartition[numParallelProducers];

checkArgument(numParallelProducers >= 1);
this.numParallelProducers = numParallelProducers;

this.partitions = new IntermediateResultPartition[numParallelProducers];

this.numberOfRunningProducers = new AtomicInteger(numParallelProducers);

// we do not set the intermediate result partitions here, because we let them be initialized by
Expand All @@ -80,6 +92,7 @@ public void setPartition(int partitionNumber, IntermediateResultPartition partit
}

partitions[partitionNumber] = partition;
partitionLookupHelper.put(partition.getPartitionId(), partitionNumber);
partitionsAssigned++;
}

Expand All @@ -95,6 +108,28 @@ public IntermediateResultPartition[] getPartitions() {
return partitions;
}

/**
* Returns the partition with the given ID.
*
* @param resultPartitionId ID of the partition to look up
* @throws NullPointerException If partition ID <code>null</code>
* @throws IllegalArgumentException Thrown if unknown partition ID
* @return Intermediate result partition with the given ID
*/
public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) {
// Looks ups the partition number via the helper map and returns the
// partition. Currently, this happens infrequently enough that we could
// consider removing the map and scanning the partitions on every lookup.
// The lookup (currently) only happen when the producer of an intermediate
// result cannot be found via its registered execution.
Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID"));
if (partitionNumber != null) {
return partitions[partitionNumber];
} else {
throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId);
}
}

public int getNumberOfAssignedPartitions() {
return partitionsAssigned;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;

/**
* Intermediate partition state checker to query the JobManager about the state
* of the producer of a result partition.
*
* <p>These checks are triggered when a partition request is answered with a
* PartitionNotFound event. This usually happens when the producer of that
* partition has not registered itself with the network stack or terminated.
*/
public interface PartitionProducerStateChecker {

/**
* Requests the execution state of the execution producing a result partition.
*
* @param jobId ID of the job the partition belongs to.
* @param intermediateDataSetId ID of the parent intermediate data set.
* @param resultPartitionId ID of the result partition to check. This
* identifies the producing execution and partition.
*
* @return Future holding the execution state of the producing execution.
*/
Future<ExecutionState> requestPartitionProducerState(
JobID jobId,
IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ public class SingleInputGate implements InputGate {
/** The job ID of the owning task. */
private final JobID jobId;

/** The execution attempt ID of the owning task. */
private final ExecutionAttemptID executionId;

/**
* The ID of the consumed intermediate result. Each input gate consumes partitions of the
* intermediate result specified by this ID. This ID also identifies the input gate at the
Expand Down Expand Up @@ -168,7 +165,6 @@ public class SingleInputGate implements InputGate {
public SingleInputGate(
String owningTaskName,
JobID jobId,
ExecutionAttemptID executionId,
IntermediateDataSetID consumedResultId,
int consumedSubpartitionIndex,
int numberOfInputChannels,
Expand All @@ -177,7 +173,6 @@ public SingleInputGate(

this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
this.executionId = checkNotNull(executionId);

this.consumedResultId = checkNotNull(consumedResultId);

Expand Down Expand Up @@ -530,11 +525,7 @@ void notifyChannelNonEmpty(InputChannel channel) {
}

void triggerPartitionStateCheck(ResultPartitionID partitionId) {
taskActions.triggerPartitionStateCheck(
jobId,
executionId,
consumedResultId,
partitionId);
taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId);
}

private void queueChannel(InputChannel channel) {
Expand Down Expand Up @@ -587,7 +578,7 @@ public static SingleInputGate create(
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
icdd.length, taskActions, metrics);

// Create the input channels. There is one input channel for each consumed partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.netty;
package org.apache.flink.runtime.jobmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;

public interface PartitionStateChecker {
Future<PartitionState> requestPartitionState(
JobID jobId,
ExecutionAttemptID executionId,
IntermediateDataSetID resultId,
ResultPartitionID partitionId);
/**
* Exception returned to a TaskManager on {@link RequestPartitionProducerState}
* if the producer of a partition has been disposed.
*/
public class PartitionProducerDisposedException extends Exception {

public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) {
super(String.format("Execution %s producing partition %s has already been disposed.",
resultPartitionID.getProducerId(),
resultPartitionID.getPartitionId()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.JobManagerMessages;
Expand All @@ -36,32 +35,32 @@
* This implementation uses {@link ActorGateway} to trigger the partition state check at the job
* manager.
*/
public class ActorGatewayPartitionStateChecker implements PartitionStateChecker {
public class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker {

private final ActorGateway jobManager;
private final FiniteDuration timeout;

public ActorGatewayPartitionStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
public ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
this.jobManager = Preconditions.checkNotNull(jobManager);
this.timeout = Preconditions.checkNotNull(timeout);
}

@Override
public Future<PartitionState> requestPartitionState(
public Future<ExecutionState> requestPartitionProducerState(
JobID jobId,
ExecutionAttemptID executionAttemptId,
IntermediateDataSetID resultId,
ResultPartitionID partitionId) {
JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(
IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId) {

JobManagerMessages.RequestPartitionProducerState msg = new JobManagerMessages.RequestPartitionProducerState(
jobId,
partitionId,
executionAttemptId,
resultId);
intermediateDataSetId, resultPartitionId
);

scala.concurrent.Future<PartitionState> futureResponse = jobManager
scala.concurrent.Future<ExecutionState> futureResponse = jobManager
.ask(msg, timeout)
.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));

return new FlinkFuture<>(futureResponse);
}

}
Loading

0 comments on commit a078666

Please sign in to comment.