Skip to content

Commit

Permalink
[FLINK-3852] [quickstart] Add skeleton StreamingJob
Browse files Browse the repository at this point in the history
  - move Job to BatchJob
  - comment out transformers for the mainClass setting
  - tidy up SocketTextStreamWordCount
  - update docs

This closes apache#1982
  • Loading branch information
markreddy authored and StephanEwen committed May 17, 2016
1 parent 6c0c0a5 commit 3080ea4
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 34 deletions.
4 changes: 2 additions & 2 deletions docs/quickstart/java_api_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ Use one of the following commands to __create a project__:

There will be a new directory in your working directory. If you've used the _curl_ approach, the directory is called `quickstart`. Otherwise, it has the name of your artifactId.

The sample project is a __Maven project__, which contains two classes. _Job_ is a basic skeleton program and _WordCountJob_ a working example. Please note that the _main_ method of both classes allow you to start Flink in a development/testing mode.
The sample project is a __Maven project__, which contains four classes. _StreamingJob_ and _BatchJob_ are basic skeleton programs, _SocketTextStreamWordCount_ is a working streaming example and _WordCountJob_ is a working batch example. Please note that the _main_ method of all classes allow you to start Flink in a development/testing mode.

We recommend to __import this project into your IDE__ to develop and test it. If you use Eclipse, the [m2e plugin](http:https://www.eclipse.org/m2e/) allows to [import Maven projects](http:https://books.sonatype.com/m2eclipse-book/reference/creating-sect-importing-projects.html#fig-creating-import). Some Eclipse bundles include that plugin by default, others require you to install it manually. The IntelliJ IDE also supports Maven projects out of the box.
We recommend you __import this project into your IDE__ to develop and test it. If you use Eclipse, the [m2e plugin](http:https://www.eclipse.org/m2e/) allows to [import Maven projects](http:https://books.sonatype.com/m2eclipse-book/reference/creating-sect-importing-projects.html#fig-creating-import). Some Eclipse bundles include that plugin by default, others require you to install it manually. The IntelliJ IDE also supports Maven projects out of the box.


A note to Mac OS X users: The default JVM heapsize for Java is too small for Flink. You have to manually increase it. Choose "Run Configurations" -> Arguments and write into the "VM Arguments" box: "-Xmx800m" in Eclipse.
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart/scala_api_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ $ curl https://flink.apache.org/q/quickstart-scala.sh | bash

There will be a new directory in your working directory. If you've used the _curl_ approach, the directory is called `quickstart`. Otherwise, it has the name of your artifactId.

The sample project is a __Maven project__, which contains two classes. _Job_ is a basic skeleton program and _WordCountJob_ a working example. Please note that the _main_ method of both classes allow you to start Flink in a development/testing mode.
The sample project is a __Maven project__, which contains four classes. _StreamingJob_ and _BatchJob_ are basic skeleton programs, _SocketTextStreamWordCount_ is a working streaming example and _WordCountJob_ is a working batch example. Please note that the _main_ method of all classes allow you to start Flink in a development/testing mode.

We recommend to __import this project into your IDE__. For Eclipse, you need the following plugins, which you can install from the provided Eclipse Update Sites:
We recommend you __import this project into your IDE__. For Eclipse, you need the following plugins, which you can install from the provided Eclipse Update Sites:

* _Eclipse 4.x_
* [Scala IDE](http:https://download.scala-ide.org/sdk/e38/scala210/stable/site)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,15 @@ under the License.
</excludes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
This will add a Main-Class entry to the manifest file -->
<!--
<transformers>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${package}.Job</mainClass>
<mainClass>${package}.StreamingJob</mainClass>
</transformer>
</transformers>
-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.api.java.ExecutionEnvironment;

/**
* Skeleton for a Flink Job.
* Skeleton for a Flink Batch Job.
*
* For a full example of a Flink Job, see the WordCountJob.java file in the
* For a full example of a Flink Batch Job, see the WordCountJob.java file in the
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
Expand All @@ -32,16 +32,20 @@
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
* target/flink-quickstart-${version}.jar
* From the CLI you can then run
* ./bin/flink run -c ${package}.BatchJob target/flink-quickstart-${version}.jar
*
* For more information on the CLI see:
*
* http:https://flink.apache.org/docs/latest/apis/cli.html
*/
public class Job {
public class BatchJob {

public static void main(String[] args) throws Exception {
// set up the execution environment
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


/**
* Here, you can start creating your execution plan for Flink.
*
Expand All @@ -54,6 +58,7 @@ public static void main(String[] args) throws Exception {
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
Expand All @@ -66,6 +71,6 @@ public static void main(String[] args) throws Exception {
*/

// execute program
env.execute("Flink Java API Skeleton");
env.execute("Flink Batch Java API Skeleton");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
*
* <p>
* Usage:
* <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
* <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt;</code>
* <br>
*
* <p>
* This example shows how to:
* <ul>
* <li>use StreamExecutionEnvironment.socketTextStream
* <li>write a simple Flink program,
* <li>write and use user-defined functions.
* <li>write a simple Flink program
* <li>write and use user-defined functions
* </ul>
*
* @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
Expand Down Expand Up @@ -82,8 +82,9 @@ public static void main(String[] args) throws Exception {
counts.print();

// execute program
env.execute("WordCount from SocketTextStream Example");
env.execute("Java WordCount from SocketTextStream Example");
}

//
// User Functions
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ${package};

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
* Skeleton for a Flink Streaming Job.
*
* For a full example of a Flink Streaming Job, see the SocketTextStreamWordCount.java
* file in the same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-${version}.jar
* From the CLI you can then run
* ./bin/flink run -c ${package}.StreamingJob target/flink-quickstart-${version}.jar
*
* For more information on the CLI see:
*
* http:https://flink.apache.org/docs/latest/apis/cli.html
*/
public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/**
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataStream<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* http:https://flink.apache.org/docs/latest/apis/streaming/index.html
*
*/

// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,15 @@ under the License.
</excludes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
This will add a Main-Class entry to the manifest file -->
<!--
<transformers>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${package}.Job</mainClass>
<mainClass>${package}.StreamingJob</mainClass>
</transformer>
</transformers>
-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package ${package}
import org.apache.flink.api.scala._

/**
* Skeleton for a Flink Job.
* Skeleton for a Flink Batch Job.
*
* For a full example of a Flink Job, see the WordCountJob.scala file in the
* For a full example of a Flink Batch Job, see the WordCountJob.scala file in the
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
Expand All @@ -32,22 +32,29 @@ import org.apache.flink.api.scala._
* mvn clean package
* }}}
* in the projects root directory. You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
* target/flink-quickstart-${version}.jar
* From the CLI you can then run
* {{{
* ./bin/flink run -c ${package}.BatchJob target/flink-quickstart-${version}.jar
* }}}
*
* For more information on the CLI see:
*
* http:https://flink.apache.org/docs/latest/apis/cli.html
*/
object Job {
object BatchJob {
def main(args: Array[String]) {
// set up the execution environment
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

/**
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
* env.readTextFile(textPath);
*
* then, transform the resulting DataSet[String] using operations
* like:
* like
* .filter()
* .flatMap()
* .join()
Expand All @@ -64,8 +71,7 @@ object Job {
*
*/


// execute program
env.execute("Flink Scala API Skeleton")
env.execute("Flink Batch Scala API Skeleton")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import org.apache.flink.streaming.api.scala._
*
* Usage:
* {{{
* SocketTextStreamWordCount <hostname> <port> <output path>
* SocketTextStreamWordCount <hostname> <port>
* }}}
*
* This example shows how to:
*
* - use StreamExecutionEnvironment.socketTextStream
* - write a simple Flink Streaming program in scala.
* - write and use user-defined functions.
* - write a simple Flink Streaming program in scala
* - write and use user-defined functions
*/
object SocketTextStreamWordCount {

Expand All @@ -55,7 +55,7 @@ object SocketTextStreamWordCount {

val env = StreamExecutionEnvironment.getExecutionEnvironment

//Create streams for names and ages by mapping the inputs to the corresponding objects
// create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
Expand All @@ -64,6 +64,6 @@ object SocketTextStreamWordCount {

counts print

env.execute("Scala SocketTextStreamWordCount Example")
env.execute("Scala WordCount from SocketTextStream Example")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ${package}

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.api.scala._

/**
* Skeleton for a Flink Streaming Job.
*
* For a full example of a Flink Streaming Job, see the SocketTextStreamWordCount.java
* file in the same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster. Just type
* {{{
* mvn clean package
* }}}
* in the projects root directory. You will find the jar in
* target/flink-quickstart-${version}.jar
* From the CLI you can then run
* {{{
* ./bin/flink run -c ${package}.StreamingJob target/flink-quickstart-${version}.jar
* }}}
*
* For more information on the CLI see:
*
* http:https://flink.apache.org/docs/latest/apis/cli.html
*/
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

/**
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataStream[String] using operations
* like
* .filter()
* .flatMap()
* .join()
* .group()
*
* and many more.
* Have a look at the programming guide:
*
* http:https://flink.apache.org/docs/latest/apis/streaming/index.html
*
*/

// execute program
env.execute("Flink Streaming Scala API Skeleton")
}
}

0 comments on commit 3080ea4

Please sign in to comment.