Skip to content

Commit

Permalink
[fixup] Create local variable for reuse in SingleInputGate
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijiangW committed Apr 3, 2019
1 parent a157462 commit 8295390
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskActions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -676,9 +677,11 @@ public static SingleInputGate create(

final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());

final NetworkEnvironmentConfiguration networkConfig = networkEnvironment.getConfiguration();

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics, networkEnvironment.getConfiguration().isCreditBased());
icdd.length, taskActions, metrics, networkConfig.isCreditBased());

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand All @@ -695,8 +698,8 @@ public static SingleInputGate create(
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getResultPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConfiguration().partitionRequestInitialBackoff(),
networkEnvironment.getConfiguration().partitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand All @@ -706,8 +709,8 @@ else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getConfiguration().partitionRequestInitialBackoff(),
networkEnvironment.getConfiguration().partitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand All @@ -718,8 +721,8 @@ else if (partitionLocation.isUnknown()) {
networkEnvironment.getResultPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getConfiguration().partitionRequestInitialBackoff(),
networkEnvironment.getConfiguration().partitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand Down

0 comments on commit 8295390

Please sign in to comment.