Skip to content

Commit

Permalink
[refactor][tests] Remove unnecessary wrapping with ConsumableNotifyin…
Browse files Browse the repository at this point in the history
…gResultPartitionWriterDecorator
  • Loading branch information
StephanEwen committed Oct 6, 2020
1 parent 86d299e commit 91af74e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.io.network.api.writer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand All @@ -40,7 +39,6 @@
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
Expand All @@ -54,8 +52,6 @@
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
Expand Down Expand Up @@ -305,12 +301,8 @@ public void testIsAvailableOrNot() throws Exception {
.setBufferPoolFactory(() -> localPool)
.build();
resultPartition.setup();
final ResultPartitionWriter partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator(
new NoOpTaskActions(),
new JobID(),
resultPartition,
new NoOpResultPartitionConsumableNotifier());
final RecordWriter recordWriter = createRecordWriter(partitionWrapper);

final RecordWriter<?> recordWriter = createRecordWriter(resultPartition);

try {
// record writer is available because of initial available global pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
Expand All @@ -42,9 +41,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.InputGateWithMetrics;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
Expand Down Expand Up @@ -188,15 +185,9 @@ public ResultPartitionWriter createResultPartitionWriter(int partitionIndex) thr
.setupBufferPoolFactoryFromNettyShuffleEnvironment(senderEnv)
.build();

ResultPartitionWriter consumableNotifyingPartitionWriter = new ConsumableNotifyingResultPartitionWriterDecorator(
new NoOpTaskActions(),
jobId,
resultPartitionWriter,
new NoOpResultPartitionConsumableNotifier());
resultPartitionWriter.setup();

consumableNotifyingPartitionWriter.setup();

return consumableNotifyingPartitionWriter;
return resultPartitionWriter;
}

private void generatePartitionIds() throws Exception {
Expand Down

0 comments on commit 91af74e

Please sign in to comment.