Skip to content

Commit

Permalink
[streaming] Fixed streaming example jars packaging and termination
Browse files Browse the repository at this point in the history
Closes apache#816
  • Loading branch information
mbalassi committed Jun 10, 2015
1 parent e2304c4 commit 7f3108d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 80 deletions.
14 changes: 13 additions & 1 deletion flink-staging/flink-streaming/flink-streaming-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ under the License.
</goals>
<configuration>
<artifactItems>
<!-- For WordCount example data -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
Expand All @@ -107,6 +108,16 @@ under the License.
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
</artifactItem>
<!-- For JSON utilities -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/streaming/connectors/json/*</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down Expand Up @@ -190,7 +201,8 @@ under the License.

<includes>
<include>org/apache/flink/streaming/examples/twitter/*.class</include>
<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
<include>org/apache/flink/streaming/connectors/json/*.class</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,26 @@ public static void main(String[] args) throws Exception {
// *************************************************************************

/**
* Generate random integer pairs from the range from 0 to BOUND/2
* Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
*/
private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;

private Random rnd = new Random();

private volatile boolean isRunning = true;
private int counter = 0;

@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {

while (isRunning) {
while (isRunning && counter < BOUND) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;

ctx.collect(new Tuple2<Integer, Integer>(first, second));
Thread.sleep(500L);
counter++;
Thread.sleep(50L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static class GradeSource implements SourceFunction<Tuple2<String, Integer
private Random rand;
private Tuple2<String, Integer> outTuple;
private volatile boolean isRunning = true;
private int counter;

public GradeSource() {
rand = new Random();
Expand All @@ -112,10 +113,11 @@ public GradeSource() {

@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (isRunning) {
while (isRunning && counter < 100) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
}
}
Expand All @@ -135,6 +137,7 @@ public static class SalarySource extends RichSourceFunction<Tuple2<String, Integ
private transient Random rand;
private transient Tuple2<String, Integer> outTuple;
private volatile boolean isRunning;
private int counter;

public void open(Configuration parameters) throws Exception {
super.open(parameters);
Expand All @@ -146,10 +149,11 @@ public void open(Configuration parameters) throws Exception {

@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (isRunning) {
while (isRunning && counter < 100) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public static void main(String[] args) throws Exception {
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setDegreeOfParallelism(1);
createSourceStreams(env);
trainingData = env.addSource(new FiniteTrainingDataSource());
newData = env.addSource(new FiniteNewDataSource());

// build new model on every second of new data
DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp()))
Expand All @@ -86,34 +86,6 @@ public static void main(String[] args) throws Exception {
// USER FUNCTIONS
// *************************************************************************

/**
* Feeds new data for newData. By default it is implemented as constantly
* emitting the Integer 1 in a loop.
*/
public static class NewDataSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
private static final int NEW_DATA_SLEEP_TIME = 1000;

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (isRunning) {
ctx.collect(getNewData());
}
}

private Integer getNewData() throws InterruptedException {
Thread.sleep(NEW_DATA_SLEEP_TIME);
return 1;
}

@Override
public void cancel() {
isRunning = true;
}
}

/**
* Feeds new data for newData. By default it is implemented as constantly
* emitting the Integer 1 in a loop.
Expand Down Expand Up @@ -142,36 +114,6 @@ private Integer getNewData() throws InterruptedException {
}
}

/**
* Feeds new training data for the partial model builder. By default it is
* implemented as constantly emitting the Integer 1 in a loop.
*/
public static class TrainingDataSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
private static final int TRAINING_DATA_SLEEP_TIME = 10;

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Integer> collector) throws Exception {
while (isRunning) {
collector.collect(getTrainingData());
}

}

private Integer getTrainingData() throws InterruptedException {
Thread.sleep(TRAINING_DATA_SLEEP_TIME);
return 1;

}

@Override
public void cancel() {
isRunning = false;
}
}

/**
* Feeds new training data for the partial model builder. By default it is
* implemented as constantly emitting the Integer 1 in a loop.
Expand Down Expand Up @@ -292,13 +234,4 @@ private static boolean parseParameters(String[] args) {
return true;
}

public static void createSourceStreams(StreamExecutionEnvironment env) {
if (fileOutput) {
trainingData = env.addSource(new FiniteTrainingDataSource());
newData = env.addSource(new FiniteNewDataSource());
} else {
trainingData = env.addSource(new TrainingDataSource());
newData = env.addSource(new NewDataSource());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
* and a timestamp. The streaming example triggers the top speed of each car
* every x meters elapsed for the last y seconds.
*/
public class TopSpeedWindowingExample {
public class TopSpeedWindowing {

private static final int NUM_CAR_EVENTS = 100;

// *************************************************************************
// PROGRAM
Expand Down Expand Up @@ -94,6 +96,7 @@ private static class CarSource implements SourceFunction<Tuple4<Integer, Integer
private Random rand = new Random();

private volatile boolean isRunning = true;
private int counter;

private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
Expand All @@ -109,8 +112,8 @@ public static CarSource create(int cars) {
@Override
public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {

while (isRunning) {
Thread.sleep(1000);
while (isRunning && counter < NUM_CAR_EVENTS) {
Thread.sleep(100);
for (int carId = 0; carId < speeds.length; carId++) {
if (rand.nextBoolean()) {
speeds[carId] = Math.min(100, speeds[carId] + 5);
Expand All @@ -121,6 +124,7 @@ public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throw
Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
speeds[carId], distances[carId], System.currentTimeMillis());
ctx.collect(record);
counter++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;

import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;

Expand All @@ -39,7 +39,7 @@ protected void postSubmit() throws Exception {

@Override
protected void testProgram() throws Exception {
TopSpeedWindowingExample.main(new String[]{textPath, resultPath});
TopSpeedWindowing.main(new String[]{textPath, resultPath});

}
}

0 comments on commit 7f3108d

Please sign in to comment.