Skip to content

Commit

Permalink
[FLINK-14382][examples] Adding multiple --input support to WordCount
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang0918 authored and pnowojski committed Nov 15, 2019
1 parent 468415b commit 304736d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/**
* Implements the "WordCount" program that computes a simple word occurrence histogram
Expand All @@ -51,7 +52,7 @@ public class WordCount {

public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -60,10 +61,17 @@ public static void main(String[] args) throws Exception {
env.getConfig().setGlobalJobParameters(params);

// get input data
DataSet<String> text;
DataSet<String> text = null;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataSet should not be null.");
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/**
* Implements the "WordCount" program that computes a simple word occurrence
Expand Down Expand Up @@ -51,7 +52,7 @@ public class WordCount {
public static void main(String[] args) throws Exception {

// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -60,10 +61,17 @@ public static void main(String[] args) throws Exception {
env.getConfig().setGlobalJobParameters(params);

// get input data
DataStream<String> text;
DataStream<String> text = null;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
Expand Down

0 comments on commit 304736d

Please sign in to comment.