Skip to content

Commit

Permalink
[FLINK-2136] [streaming] Added operator tests to DataStream
Browse files Browse the repository at this point in the history
Closes apache#771
  • Loading branch information
Gábor Hermann authored and mbalassi committed Jun 6, 2015
1 parent 59a2297 commit a363308
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), false);
Utils.getCallLocationName(), true);

return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder), initialValue,
outType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
node.put(PREDECESSORS, inputs);

for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceID();
int inputID = inEdge.getSourceId();

Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
.get(inputID) : inputID;
Expand All @@ -85,7 +85,7 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceID();
int operator = inEdge.getSourceId();

if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
iterationHead = operator;
Expand Down Expand Up @@ -127,7 +127,7 @@ private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int head
obj.put(PREDECESSORS, inEdges);

for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceID();
int inputID = inEdge.getSourceId();

if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, vertexID, inputID, inputID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public String toString() {
builder.append("\nOutput names: " + getNonChainedOutputs(cl));
builder.append("\nPartitioning:");
for (StreamEdge output : getNonChainedOutputs(cl)) {
int outputname = output.getTargetID();
int outputname = output.getTargetId();
builder.append("\n\t" + outputname + ": " + output.getPartitioner());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public StreamNode getTargetVertex() {
return targetVertex;
}

public int getSourceID() {
return sourceVertex.getID();
public int getSourceId() {
return sourceVertex.getId();
}

public int getTargetID() {
return targetVertex.getID();
public int getTargetId() {
return targetVertex.getId();
}

public int getTypeNumber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iter
getStreamNode(sinkID).setOperatorName("IterationTail-" + iterationTail);

iteration.getSource().setParallelism(iteration.getSink().getParallelism());
setBufferTimeout(iteration.getSource().getID(), getStreamNode(iterationTail)
setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail)
.getBufferTimeout());

if (LOG.isDebugEnabled()) {
Expand All @@ -257,8 +257,8 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,

StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID),
getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject);
getStreamNode(edge.getSourceID()).addOutEdge(edge);
getStreamNode(edge.getTargetID()).addInEdge(edge);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}

public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
Expand Down Expand Up @@ -335,7 +335,7 @@ public StreamEdge getStreamEdge(int sourceId, int targetId) {
while (outIterator.hasNext()) {
StreamEdge edge = outIterator.next();

if (edge.getTargetID() == targetId) {
if (edge.getTargetId() == targetId) {
return edge;
}
}
Expand All @@ -354,7 +354,7 @@ public Collection<StreamNode> getStreamNodes() {
public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
Set<Tuple2<Integer, StreamOperator<?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?>>>();
for (StreamNode vertex : streamNodes.values()) {
operatorSet.add(new Tuple2<Integer, StreamOperator<?>>(vertex.getID(), vertex
operatorSet.add(new Tuple2<Integer, StreamOperator<?>>(vertex.getId(), vertex
.getOperator()));
}
return operatorSet;
Expand Down Expand Up @@ -389,7 +389,7 @@ protected void removeVertex(StreamNode toRemove) {
for (StreamEdge edge : edgesToRemove) {
removeEdge(edge);
}
streamNodes.remove(toRemove.getID());
streamNodes.remove(toRemove.getId());
}

/**
Expand Down Expand Up @@ -462,7 +462,7 @@ public static enum ResourceStrategy {
* Object for representing loops in streaming programs.
*
*/
protected static class StreamLoop {
public static class StreamLoop {

private Integer loopID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class StreamNode implements Serializable {

transient private StreamExecutionEnvironment env;

private Integer ID;
private Integer id;
private Integer parallelism = null;
private Long bufferTimeout = null;
private String operatorName;
Expand All @@ -62,11 +62,11 @@ public class StreamNode implements Serializable {

private InputFormat<?, ?> inputFormat;

public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?> operator,
public StreamNode(StreamExecutionEnvironment env, Integer id, StreamOperator<?> operator,
String operatorName, List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this.env = env;
this.ID = ID;
this.id = id;
this.operatorName = operatorName;
this.operator = operator;
this.outputSelectors = outputSelector;
Expand All @@ -75,16 +75,16 @@ public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?>
}

public void addInEdge(StreamEdge inEdge) {
if (inEdge.getTargetID() != getID()) {
throw new IllegalArgumentException("Destination ID doesn't match the StreamNode ID");
if (inEdge.getTargetId() != getId()) {
throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
} else {
inEdges.add(inEdge);
}
}

public void addOutEdge(StreamEdge outEdge) {
if (outEdge.getSourceID() != getID()) {
throw new IllegalArgumentException("Source ID doesn't match the StreamNode ID");
if (outEdge.getSourceId() != getId()) {
throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
} else {
outEdges.add(outEdge);
}
Expand All @@ -102,7 +102,7 @@ public List<Integer> getOutEdgeIndices() {
List<Integer> outEdgeIndices = new ArrayList<Integer>();

for (StreamEdge edge : outEdges) {
outEdgeIndices.add(edge.getTargetID());
outEdgeIndices.add(edge.getTargetId());
}

return outEdgeIndices;
Expand All @@ -112,14 +112,14 @@ public List<Integer> getInEdgeIndices() {
List<Integer> inEdgeIndices = new ArrayList<Integer>();

for (StreamEdge edge : inEdges) {
inEdgeIndices.add(edge.getSourceID());
inEdgeIndices.add(edge.getSourceId());
}

return inEdgeIndices;
}

public Integer getID() {
return ID;
public Integer getId() {
return id;
}

public int getParallelism() {
Expand Down Expand Up @@ -216,7 +216,7 @@ public void isolateSlot() {

@Override
public String toString() {
return operatorName + ID;
return operatorName + id;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();

for (StreamEdge edge : physicalEdgesInOrder) {
int target = edge.getTargetID();
int target = edge.getTargetId();

List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);

Expand Down Expand Up @@ -154,12 +154,12 @@ private List<StreamEdge> createChain(Integer startNode, Integer current) {
}

for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetID()));
transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetId()));
}

for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetID(), nonChainable.getTargetID());
createChain(nonChainable.getTargetId(), nonChainable.getTargetId());
}

chainedNames.put(current, createChainedName(current, chainableOutputs));
Expand Down Expand Up @@ -203,14 +203,14 @@ private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutpu
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<String>();
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetID()));
outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
}
String returnOperatorName = operatorName + " -> ("
+ StringUtils.join(outputChainedNames, ", ") + ")";
return returnOperatorName;
} else if (chainedOutputs.size() == 1) {
String returnOperatorName = operatorName + " -> "
+ chainedNames.get(chainedOutputs.get(0).getTargetID());
+ chainedNames.get(chainedOutputs.get(0).getTargetId());
return returnOperatorName;
} else {
return operatorName;
Expand Down Expand Up @@ -281,8 +281,8 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
allOutputs.addAll(nonChainableOutputs);

for (StreamEdge output : allOutputs) {
config.setSelectedNames(output.getTargetID(),
streamGraph.getStreamEdge(vertexID, output.getTargetID()).getSelectedNames());
config.setSelectedNames(output.getTargetId(),
streamGraph.getStreamEdge(vertexID, output.getTargetId()).getSelectedNames());
}

vertexConfigs.put(vertexID, config);
Expand All @@ -292,7 +292,7 @@ private void connect(Integer headOfChain, StreamEdge edge) {

physicalEdgesInOrder.add(edge);

Integer downStreamvertexID = edge.getTargetID();
Integer downStreamvertexID = edge.getTargetId();

AbstractJobVertex headVertex = jobVertices.get(headOfChain);
AbstractJobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
Expand Down Expand Up @@ -358,8 +358,8 @@ private void setSlotSharing() {

for (StreamLoop loop : streamGraph.getStreamLoops()) {
CoLocationGroup ccg = new CoLocationGroup();
AbstractJobVertex tail = jobVertices.get(loop.getSink().getID());
AbstractJobVertex head = jobVertices.get(loop.getSource().getID());
AbstractJobVertex tail = jobVertices.get(loop.getSink().getId());
AbstractJobVertex head = jobVertices.get(loop.getSource().getId());
ccg.addVertex(head);
ccg.addVertex(tail);
tail.updateCoLocationGroup(ccg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
}
}

for (Integer flattenerID : flatteners) {
for (Integer flattenerId : flatteners) {
// Flatteners should have exactly one input
StreamNode input = streamGraph.getStreamNode(flattenerID).getInEdges().get(0)
StreamNode input = streamGraph.getStreamNode(flattenerId).getInEdges().get(0)
.getSourceVertex();

// Check whether the flatten is applied after a merge
Expand All @@ -64,18 +64,18 @@ private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();

// We connect the merge input to the flattener directly
streamGraph.addEdge(mergeInput.getID(), flattenerID,
streamGraph.addEdge(mergeInput.getId(), flattenerId,
new RebalancePartitioner(true), 0, new ArrayList<String>());

// If the merger is only connected to the flattener we delete it
// completely, otherwise we only remove the edge
if (input.getOutEdges().size() > 1) {
streamGraph.removeEdge(streamGraph.getStreamEdge(input.getID(), flattenerID));
streamGraph.removeEdge(streamGraph.getStreamEdge(input.getId(), flattenerId));
} else {
streamGraph.removeVertex(input);
}

streamGraph.setParallelism(flattenerID, mergeInput.getParallelism());
streamGraph.setParallelism(flattenerId, mergeInput.getParallelism());
}
}

Expand Down Expand Up @@ -137,14 +137,14 @@ private static void setDiscretizerReuse(StreamGraph streamGraph) {
if (matchList.size() > 1) {
StreamNode first = matchList.get(0);
for (int i = 1; i < matchList.size(); i++) {
replaceDiscretizer(streamGraph, matchList.get(i).getID(), first.getID());
replaceDiscretizer(streamGraph, matchList.get(i).getId(), first.getId());
}
}
}
}

private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
Integer replaceWithID) {
Integer replaceWithId) {
// Convert to array to create a copy
List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph
.getStreamNode(toReplaceID).getOutEdges());
Expand All @@ -155,7 +155,7 @@ private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplac
for (int i = 0; i < numOutputs; i++) {
StreamEdge outEdge = outEdges.get(i);

streamGraph.addEdge(replaceWithID, outEdge.getTargetID(), outEdge.getPartitioner(), 0,
streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), outEdge.getPartitioner(), 0,
new ArrayList<String>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public OutputHandler(StreamTask<OUT, ?> vertex) {
for (StreamEdge outEdge : outEdgesInOrder) {
StreamOutput<?> streamOutput = createStreamOutput(
outEdge,
outEdge.getTargetID(),
chainedConfigs.get(outEdge.getSourceID()),
outEdge.getTargetId(),
chainedConfigs.get(outEdge.getSourceId()),
outEdgesInOrder.indexOf(outEdge));
outputMap.put(outEdge, streamOutput);
}
Expand Down Expand Up @@ -134,7 +134,7 @@ private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig) {

// Create collectors for the chained outputs
for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
Integer output = outputEdge.getTargetID();
Integer output = outputEdge.getTargetId();

Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));

Expand Down
Loading

0 comments on commit a363308

Please sign in to comment.