Skip to content

Commit

Permalink
[hotfix][runtime] Remove StreamGraphGenerator#isSlotSharingEnabled
Browse files Browse the repository at this point in the history
The field has been superseded by ExecutionConfig#allVerticesInSameSlotSharingGroupByDefault.

This field was always true. So the removal does not affect anything in production.
  • Loading branch information
zhuzhurk authored and tillrohrmann committed Nov 5, 2019
1 parent 047ba24 commit 503d5a2
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public class StreamGraphGenerator {

private boolean chaining = true;

private boolean isSlotSharingEnabled = true;

private ScheduleMode scheduleMode = DEFAULT_SCHEDULE_MODE;

private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
Expand Down Expand Up @@ -161,11 +159,6 @@ public StreamGraphGenerator setChaining(boolean chaining) {
return this;
}

public StreamGraphGenerator setSlotSharingEnabled(boolean isSlotSharingEnabled) {
this.isSlotSharingEnabled = isSlotSharingEnabled;
return this;
}

public StreamGraphGenerator setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
return this;
Expand Down Expand Up @@ -749,10 +742,6 @@ private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputT
* @param inputIds The IDs of the input operations.
*/
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (!isSlotSharingEnabled) {
return null;
}

if (specifiedGroup != null) {
return specifiedGroup;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -471,39 +470,6 @@ public void testIteration() {
}
}

/**
* Test iteration job when disable slot sharing, check slot sharing group and co-location group.
*/
@Test
public void testIterationWithSlotSharingDisabled() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
IterativeStream<Integer> iteration = source.iterate(3000);
iteration.name("iteration").setParallelism(2);
DataStream<Integer> map = iteration.map(x -> x + 1).name("map").setParallelism(2);
DataStream<Integer> filter = map.filter((x) -> false).name("filter").setParallelism(2);
iteration.closeWith(filter).print();

List<Transformation<?>> transformations = new ArrayList<>();
transformations.add(source.getTransformation());
transformations.add(iteration.getTransformation());
transformations.add(map.getTransformation());
transformations.add(filter.getTransformation());

StreamGraphGenerator generator = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig());
generator.setSlotSharingEnabled(false);
StreamGraph streamGraph = generator.generate();

for (Tuple2<StreamNode, StreamNode> iterationPair : streamGraph.getIterationSourceSinkPairs()) {
assertNotNull(iterationPair.f0.getCoLocationGroup());
assertEquals(iterationPair.f0.getCoLocationGroup(), iterationPair.f1.getCoLocationGroup());

assertNotNull(iterationPair.f0.getSlotSharingGroup());
assertEquals(iterationPair.f0.getSlotSharingGroup(), iterationPair.f1.getSlotSharingGroup());
}
}

/**
* Test slot sharing is enabled.
*/
Expand All @@ -528,31 +494,6 @@ public void testEnableSlotSharing() {
}
}

/**
* Test slot sharing is disabled.
*/
@Test
public void testDisableSlotSharing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
DataStream<Integer> mapDataStream = sourceDataStream.map(x -> x + 1);

final List<Transformation<?>> transformations = new ArrayList<>();
transformations.add(sourceDataStream.getTransformation());
transformations.add(mapDataStream.getTransformation());

// all stream nodes would have no group if slot sharing group is disabled
StreamGraph streamGraph = new StreamGraphGenerator(
transformations, env.getConfig(), env.getCheckpointConfig())
.setSlotSharingEnabled(false)
.generate();

Collection<StreamNode> streamNodes = streamGraph.getStreamNodes();
for (StreamNode streamNode : streamNodes) {
assertNull(streamNode.getSlotSharingGroup());
}
}

private static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.junit.Test;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -538,60 +536,6 @@ public void testIteration() {
assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup);
}

/**
* Test slot sharing group is enabled or disabled for iteration.
*/
@Test
public void testDisableSlotSharingForIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
IterativeStream<Integer> iteration = source.iterate(3000);
iteration.name("iteration").setParallelism(2);
DataStream<Integer> map = iteration.map(x -> x + 1).name("map").setParallelism(2);
DataStream<Integer> filter = map.filter((x) -> false).name("filter").setParallelism(2);
iteration.closeWith(filter).print();

List<Transformation<?>> transformations = new ArrayList<>();
transformations.add(source.getTransformation());
transformations.add(iteration.getTransformation());
transformations.add(map.getTransformation());
transformations.add(filter.getTransformation());
// when slot sharing group is disabled
// all job vertices except iteration vertex would have no slot sharing group
// iteration vertices would be set slot sharing group automatically
StreamGraphGenerator generator = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig());
generator.setSlotSharingEnabled(false);

JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(generator.generate());

SlotSharingGroup iterationSourceSlotSharingGroup = null;
SlotSharingGroup iterationSinkSlotSharingGroup = null;

CoLocationGroup iterationSourceCoLocationGroup = null;
CoLocationGroup iterationSinkCoLocationGroup = null;

for (JobVertex jobVertex : jobGraph.getVertices()) {
if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) {
iterationSourceSlotSharingGroup = jobVertex.getSlotSharingGroup();
iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
} else if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) {
iterationSinkSlotSharingGroup = jobVertex.getSlotSharingGroup();
iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
} else {
assertNull(jobVertex.getSlotSharingGroup());
}
}

assertNotNull(iterationSourceSlotSharingGroup);
assertNotNull(iterationSinkSlotSharingGroup);
assertEquals(iterationSourceSlotSharingGroup, iterationSinkSlotSharingGroup);

assertNotNull(iterationSourceCoLocationGroup);
assertNotNull(iterationSinkCoLocationGroup);
assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup);
}

/**
* Test default schedule mode.
*/
Expand Down

0 comments on commit 503d5a2

Please sign in to comment.