Skip to content

Commit

Permalink
[FLINK-1560] [streaming] IterateExampleITCase wip
Browse files Browse the repository at this point in the history
  • Loading branch information
szape committed Apr 1, 2015
1 parent 16abc4a commit 2e2a63f
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,25 @@
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Example illustrating iterations in Flink streaming.
* Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
* it performs to reach a specific threshold in an iterative streaming fashion. </p>
* <p/>
* <p>
* The program sums up random numbers and counts additions it performs to reach
* a specific threshold in an iterative streaming fashion.
* </p>
* <p/>
* <p/>
* This example shows how to use:
* <ul>
* <li>streaming iterations,
* <li>buffer timeout to enhance latency,
* <li>directed outputs.
* </ul>
* This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
* outputs. </ul>
*/
public class IterateExample {

private static final int BOUND = 100;

// *************************************************************************
// PROGRAM
// *************************************************************************
Expand All @@ -71,8 +64,8 @@ public static void main(String[] args) throws Exception {

// create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if(fileInput) {
inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
if (fileInput) {
inputStream = env.readTextFile(inputPath).setParallelism(1).map(new FibonacciInputMap()).setParallelism(1);
} else {
inputStream = env.addSource(new RandomFibonacciSource());
}
Expand All @@ -94,10 +87,10 @@ public static void main(String[] args) throws Exception {
// 'output' channel then get the input pairs that have the greatest iteration counter
// on a 1 second sliding window
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
.map(new OutputMap())
.window(Time.of(1L, TimeUnit.SECONDS))
.every(Time.of(500L, TimeUnit.MILLISECONDS))
.maxBy(1).flatten();
.map(new OutputMap());
// .window(Time.of(1000L, new IterateTimestamp()))
// .every(Time.of(500L, new IterateTimestamp()))
// .maxBy(1).flatten();

// emit results
if (fileOutput) {
Expand All @@ -124,9 +117,9 @@ private static class RandomFibonacciSource implements SourceFunction<Tuple2<Inte

@Override
public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
while(true) {
int first = rnd.nextInt(BOUND/2 - 1) + 1;
int second = rnd.nextInt(BOUND/2 - 1) + 1;
while (true) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;

collector.collect(new Tuple2<Integer, Integer>(first, second));
Thread.sleep(100L);
Expand All @@ -148,17 +141,19 @@ private static class FibonacciInputMap implements MapFunction<String, Tuple2<Int
@Override
public Tuple2<Integer, Integer> map(String value) throws Exception {
Thread.sleep(100L);
String record = value.substring(1, value.length()-1);
String record = value.substring(1, value.length() - 1);
String[] splitted = record.split(",");
return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
}
}

/**
* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple
* A counter is attached to the tuple and incremented in every iteration step
* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
* counter is attached to the tuple and incremented in every iteration step
*/
public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
Expand All @@ -171,12 +166,15 @@ public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, I
* Iteration step function that calculates the next Fibonacci number
*/
public static class Step implements
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
Integer> value) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
value.f3, ++value.f4);
}
}

Expand All @@ -194,20 +192,38 @@ public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Intege
} else {
output.add("output");
}
output.add("output");
//output.add("output");
return output;
}
}

/**
* Giving back the input pair and the counter
*/
public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {
public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple2<Tuple2<Integer, Integer>, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
value) throws
Exception {
return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
value.f4);
}
}

private static class IterateTimestamp implements Timestamp<Tuple2<Tuple2<Integer, Integer>, Integer>> {
private static final long serialVersionUID = 1L;

private long counter = 0;

@Override
public long getTimestamp(Tuple2<Tuple2<Integer, Integer>, Integer> value) {
counter++;
//System.out.println(counter);
// System.out.println(value.f1);
return counter * 150 + value.f1 * 5;
}
}

Expand All @@ -219,7 +235,6 @@ public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, In
private static boolean fileOutput = false;
private static String inputPath;
private static String outputPath;
private static final int BOUND = 100;

private static boolean parseParameters(String[] args) {

Expand All @@ -228,7 +243,7 @@ private static boolean parseParameters(String[] args) {
if (args.length == 1) {
fileOutput = true;
outputPath = args[0];
} else if(args.length == 2) {
} else if (args.length == 2) {
fileInput = true;
inputPath = args[0];
fileOutput = true;
Expand All @@ -244,4 +259,5 @@ private static boolean parseParameters(String[] args) {
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.iteration.util;

public class IterateExampleData {
public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
"(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
"(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";

public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
"((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
"((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
"((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";

private IterateExampleData() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,37 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.util.Collector;

public class SessionWindowing {

private static Timestamp<Tuple3<String, Long, Integer>> timestamp;

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

if(fileOutput) {
timestamp = new Timestamp<Tuple3<String, Long, Integer>>() {

private long counter = 0;

@Override
public long getTimestamp(Tuple3<String, Long, Integer> value) {
return (counter++)*3000L;
}
};
} else {
timestamp = new SystemTimestamp<Tuple3<String, Long, Integer>>();
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

final List<Tuple3<String, Long, Integer>> input = new ArrayList<Tuple3<String, Long, Integer>>();
Expand All @@ -57,7 +80,9 @@ public void run(Collector<Tuple3<String, Long, Integer>> collector)
// We sleep three seconds between every output so we
// can see whether we properly detect sessions
// before the next start for a specific id
Thread.sleep(3000);
if(!fileOutput) {
Thread.sleep(3000);
}
collector.collect(value);
System.out.println("Collected: " + value);
}
Expand All @@ -69,10 +94,16 @@ public void cancel() {
});

// We create sessions for each id with max timeout of 3 time units
source.groupBy(0)
DataStream<Tuple3<String, Long, Integer>> aggregated = source.groupBy(0)
.window(new SessionTriggerPolicy(3L),
new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
.flatten().print();
.flatten();

if(fileOutput) {
aggregated.writeAsText(outputPath);
} else {
aggregated.print();
}

env.execute();
}
Expand Down Expand Up @@ -127,4 +158,27 @@ public SessionTriggerPolicy clone() {
}

}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private static boolean fileOutput = false;
private static String outputPath;

private static boolean parseParameters(String[] args) {

if (args.length > 0) {
// parse input arguments
if (args.length == 1) {
fileOutput = true;
outputPath = args[0];
} else {
System.err.println("Usage: SessionWindowing <result path>");
return false;
}
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.windowing.util;

public class SessionWindowingData {

// public static final String INPUT = "";
public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" + "(a,10,1)";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.test.iteration;

import org.apache.flink.streaming.examples.iteration.IterateExample;
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;

public class IterateExampleITCase extends StreamingProgramTestBase {


protected String inputPath;
protected String resultPath;

@Override
protected void preSubmit() throws Exception {
inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
resultPath = getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(IterateExampleData.RESULTS, resultPath);
}

@Override
protected void testProgram() throws Exception {
IterateExample.main(new String[]{inputPath, resultPath});
}
}
Loading

0 comments on commit 2e2a63f

Please sign in to comment.