Skip to content

Commit

Permalink
[FLINK-7567] [scala] Remove keepPartitioning parameter from DataStrea…
Browse files Browse the repository at this point in the history
…m.iterate()
  • Loading branch information
mlipkovich authored and aljoscha committed Sep 22, 2017
1 parent 42cc3a2 commit 345de77
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void addFeedbackEdge(StreamTransformation<T> transform) {
throw new UnsupportedOperationException(
"Parallelism of the feedback stream must match the parallelism of the original" +
" stream. Parallelism of original stream: " + this.getParallelism() +
"; parallelism of feedback stream: " + transform.getParallelism());
"; parallelism of feedback stream: " + transform.getParallelism() +
". Parallelism can be modified using DataStream#setParallelism() method");
}

feedbackEdges.add(transform);
Expand Down
4 changes: 4 additions & 0 deletions flink-streaming-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ under the License.
<excludes combine.children="append">
<!-- Exclude generated classes from api compatibility checks -->
<exclude>*\$\$anon\$*</exclude>

<!-- Ignore method which was created automatically by Scala for default value calculation.
Can be removed once https://github.com/siom79/japicmp/issues/176 will be fixed -->
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
</excludes>
</parameter>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,23 +499,22 @@ class DataStream[T](stream: JavaStream[T]) {
* stepfunction: initialStream => (feedback, output)
*
* A common pattern is to use output splitting to create feedback and output DataStream.
* Please refer to the .split(...) method of the DataStream
* Please refer to the [[split]] method of the DataStream
*
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
*
* By default the feedback partitioning is set to match the input, to override this set
* the keepPartitioning flag to true
*
* Parallelism of the feedback stream must match the parallelism of the original stream.
* Please refer to the [[setParallelism]] method for parallelism modification
*/
@PublicEvolving
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
maxWaitTimeMillis:Long = 0,
keepPartitioning: Boolean = false) : DataStream[R] = {
maxWaitTimeMillis:Long = 0) : DataStream[R] = {
val iterativeStream = stream.iterate(maxWaitTimeMillis)

val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))

iterativeStream.closeWith(feedback.javaStream)
output
}
Expand Down

0 comments on commit 345de77

Please sign in to comment.