Skip to content

Commit

Permalink
[streaming] Updated deprecated iterative functionality and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Dec 17, 2014
1 parent 88b8b9d commit fbd0060
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 86 deletions.
18 changes: 8 additions & 10 deletions docs/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ Every output will be emitted to the selected outputs exactly once, even if you a
### Iterations
The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration the output is both streamed forward to the next operator and also streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting).
Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting).
To start an iterative part of the program the user defines the iteration starting point:
~~~java
Expand All @@ -517,20 +517,18 @@ The operator applied on the iteration starting point is the head of the iteratio
DataStream<Integer> head = iteration.map(new IterationHead());
~~~
To close an iteration and define the iteration tail, the user calls `.closeWith(tail)` method of the `IterativeDataStream`:
To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)` method of the `IterativeDataStream`.
~~~java
DataStream<Integer> tail = head.map(new IterationTail());
iteration.closeWith(tail);
~~~
Or to use with output splitting:
A common pattern is to use output splitting:
~~~java
SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelector);
iteration.closeWith(tail.select("iterate"));
SplitDataStream<..> tailOperator = head.map(new IterationTail()).split(outputSelector);
iteration.closeWith(tailOperator.select("iterate"));
~~~
Because iterative streaming programs do not have a set number of iteratons for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
In these case all output directed to the “iterate” edge would be fed back to the iteration head.
Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control the max wait time.
### Rich functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public class JobGraphBuilder {
private Map<String, byte[]> serializedFunctions;
private Map<String, byte[]> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
private Map<String, String> iterationIds;
private Map<String, String> iterationIDtoHeadName;
private Map<String, String> iterationIDtoTailName;
private Map<String, Integer> iterationIds;
private Map<Integer, String> iterationIDtoHeadName;
private Map<Integer, String> iterationIDtoTailName;
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
Expand Down Expand Up @@ -109,9 +109,9 @@ public JobGraphBuilder() {
serializedFunctions = new HashMap<String, byte[]>();
outputSelectors = new HashMap<String, byte[]>();
vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
iterationIds = new HashMap<String, String>();
iterationIDtoHeadName = new HashMap<String, String>();
iterationIDtoTailName = new HashMap<String, String>();
iterationIds = new HashMap<String, Integer>();
iterationIDtoHeadName = new HashMap<Integer, String>();
iterationIDtoTailName = new HashMap<Integer, String>();
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
Expand Down Expand Up @@ -205,7 +205,7 @@ public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> fun
* @param waitTime
* Max wait time for next record
*/
public void addIterationHead(String vertexName, String iterationHead, String iterationID,
public void addIterationHead(String vertexName, String iterationHead, Integer iterationID,
int parallelism, long waitTime) {

addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
Expand Down Expand Up @@ -242,7 +242,7 @@ public void addIterationHead(String vertexName, String iterationHead, String ite
* @param waitTime
* Max waiting time for next record
*/
public void addIterationTail(String vertexName, String iterationTail, String iterationID,
public void addIterationTail(String vertexName, String iterationTail, Integer iterationID,
int parallelism, long waitTime) {

if (bufferTimeout.get(iterationTail) == 0) {
Expand Down Expand Up @@ -558,7 +558,7 @@ private void setSlotSharing() {
vertex.setSlotSharingGroup(shareGroup);
}

for (String iterID : new HashSet<String>(iterationIds.values())) {
for (Integer iterID : new HashSet<Integer>(iterationIds.values())) {
CoLocationGroup ccg = new CoLocationGroup();
AbstractJobVertex tail = streamVertices.get(iterationIDtoTailName.get(iterID));
AbstractJobVertex head = streamVertices.get(iterationIDtoHeadName.get(iterID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
}
}

public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
public void setIterationId(Integer iterationId) {
config.setInteger(ITERATION_ID, iterationId);
}

public String getIterationId() {
return config.getString(ITERATION_ID, "iteration-0");
public Integer getIterationId() {
return config.getInteger(ITERATION_ID, 0);
}

public void setIterationWaitTime(long time) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,24 +345,24 @@ public DataStream<OUT> distribute() {
}

/**
* Initiates an iterative part of the program that executes multiple times
* and feeds back data streams. The iterative part needs to be closed by
* calling {@link IterativeDataStream#closeWith(DataStream)}. The
* transformation of this IterativeDataStream will be the iteration head.
* The data stream given to the {@code closeWith(DataStream)} method is the
* data stream that will be fed back and used as the input for the iteration
* head. Unlike in batch processing by default the output of the iteration
* stream is directed to both to the iteration head and the next component.
* To direct tuples to the iteration head or the output specifically one can
* use the {@code split(OutputSelector)} on the iteration tail while
* referencing the iteration head as 'iterate'.
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
* {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
* this IterativeDataStream will be the iteration head. The data stream
* given to the {@link IterativeDataStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head.
* Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
* more information.
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
* waiting time for the iteration.
* waiting time for the iteration head. If no data received in the set time,
* the stream terminates.
*
* @return The iterative data stream created.
*/
Expand Down Expand Up @@ -1118,7 +1118,7 @@ private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
return returnStream;
}

protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {

DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);

Expand Down Expand Up @@ -1162,8 +1162,7 @@ protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTi

if (inputStream instanceof IterativeDataStream) {
IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
returnStream.addIterationSource(iterativeStream.iterationID.toString(),
iterativeStream.waitTime);
returnStream.addIterationSource(iterativeStream.iterationID, iterativeStream.waitTime);
}

return returnStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.apache.flink.streaming.api.datastream;

import java.util.Arrays;
import java.util.List;

import org.apache.flink.streaming.partitioner.DistributePartitioner;

/**
* The iterative data stream represents the start of an iteration in a
* {@link DataStream}.
Expand Down Expand Up @@ -52,49 +47,27 @@ protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, lo

/**
* Closes the iteration. This method defines the end of the iterative
* program part. By default the DataStream represented by the parameter will
* be fed back to the iteration head, however the user can explicitly select
* which tuples should be iterated by {@code directTo(OutputSelector)}.
* Tuples directed to 'iterate' will be fed back to the iteration head.
* program part that will be fed back to the start of the iteration. </br>
* </br>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link SingleOutputStreamOperator#split(OutputSelector)} for more
* information.
*
* @param iterationResult
* The data stream that can be fed back to the next iteration.
*
*/
public DataStream<IN> closeWith(DataStream<IN> iterationResult) {
return closeWith(iterationResult, "iterate");
}

/**
* Closes the iteration. This method defines the end of the iterative
* program part. By default the DataStream represented by the parameter will
* be fed back to the iteration head, however the user can explicitly select
* which tuples should be iterated by {@code directTo(OutputSelector)}.
* Tuples directed to 'iterate' will be fed back to the iteration head.
*
* @param iterationTail
* The data stream that can be fed back to the next iteration.
* @param iterationName
* Name of the iteration edge (backward edge to iteration head)
* when used with directed emits
* @param iterationResult
* The data stream that is fed back to the next iteration head.
* @return Returns the stream that was fed back to the iteration. In most
* cases no further transformation are applied on this stream.
*
*/
public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null);

jobGraphBuilder.addIterationTail(returnStream.getId(), iterationTail.getId(),
iterationID.toString(), iterationTail.getParallelism(), waitTime);
jobGraphBuilder.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
iterationTail.getParallelism(), waitTime);

jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());

List<String> name = Arrays.asList(new String[] { iterationName });

for (DataStream<IN> stream : iterationTail.mergedStreams) {
String inputID = stream.getId();
jobGraphBuilder.setEdge(inputID, returnStream.getId(), new DistributePartitioner<IN>(
true), 0, name, false);
}

connectGraph(iterationTail, iterationSink.getId(), 0);
return iterationTail;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT
private OutputHandler<OUT> outputHandler;

private static int numSources;
private String iterationId;
private Integer iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
private long iterationWaitTime;
Expand All @@ -58,7 +58,7 @@ public void setInputsOutputs() {
shouldWait = iterationWaitTime > 0;

try {
BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
} catch (Exception e) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN, IN> {

private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);

private InputHandler<IN> inputHandler;

private String iterationId;
private Integer iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
private long iterationWaitTime;
Expand All @@ -50,7 +50,7 @@ public void setInputsOutputs() {
iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
shouldWait = iterationWaitTime > 0;
dataChannel = BlockingQueueBroker.instance().get(iterationId);
dataChannel = BlockingQueueBroker.instance().get(iterationId.toString());
} catch (Exception e) {
throw new StreamVertexException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void main(String[] args) throws Exception {
// obtain execution environment and set setBufferTimeout(0) to enable
// continuous flushing of the output buffers (lowest latency)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setBufferTimeout(0);
.setBufferTimeout(1);

// create an iterative data stream from the input
IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
Expand All @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception {
// apply the step function to add new random value to the tuple and to
// increment the counter and split the output with the output selector
SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
.setBufferTimeout(1).split(new MySelector());
.split(new MySelector());

// close the iteration by selecting the tuples that were directed to the
// 'iterate' channel in the output selector
Expand Down

0 comments on commit fbd0060

Please sign in to comment.