Skip to content

Commit

Permalink
[FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program…
Browse files Browse the repository at this point in the history
… to get rid of SplitStreamType wrapper

 - additionally reworked split ITCases
   * unified to single test
   * make Windows compatible (using tmp files)
   * added test case for embedded splitting

This closes apache#1844
  • Loading branch information
mjsax committed Apr 6, 2016
1 parent 593463b commit 02ef68f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,37 @@ public class SpoutSplitExample {

public static void main(final String[] args) throws Exception {

boolean useFile = SpoutSplitExample.parseParameters(args);

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };

final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
new SpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
new SpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, seed), rawOutputs,
1000), TypeExtractor.getForObject(new SplitStreamType<Integer>()));

SplitStream<SplitStreamType<Integer>> splitStream = numbers
.split(new StormStreamSelector<Integer>());

DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);

evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
oddStream.transform("oddBolt",
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
new BoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
new VerifyAndEnrichBolt(false)))
.print();
DataStream<Tuple2<String, Integer>> evenResult = evenStream
.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich(true));
DataStream<Tuple2<String, Integer>> oddResult = oddStream.map(
new SplitStreamMapper<Integer>()).transform("oddBolt",
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
new BoltWrapper<Integer, Tuple2<String, Integer>>(new VerifyAndEnrichBolt(false)));

if (useFile) {
evenResult.writeAsText(outputPath + "/even");
oddResult.writeAsText(outputPath + "/odd");
} else {
evenResult.print();
oddResult.print();
}

// execute program
env.execute("Spout split stream example");
Expand All @@ -84,19 +94,57 @@ public static void main(final String[] args) throws Exception {
/**
* Same as {@link VerifyAndEnrichBolt}.
*/
private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
public final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
private static final long serialVersionUID = 5213888269197438892L;
private final Tuple2<String, Integer> out;
private final boolean isEven;

public static boolean errorOccured = false;

public Enrich(String token) {
this.out = new Tuple2<String, Integer>(token, 0);
public Enrich(boolean isEven) {
this.isEven = isEven;
if (isEven) {
this.out = new Tuple2<String, Integer>("even", 0);
} else {
this.out = new Tuple2<String, Integer>("odd", 0);
}
}

@Override
public Tuple2<String, Integer> map(Integer value) throws Exception {
if ((value.intValue() % 2 == 0) != this.isEven) {
errorOccured = true;
}
this.out.setField(value, 1);
return this.out;
}
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private static long seed = System.currentTimeMillis();
private static String outputPath = null;

static boolean parseParameters(final String[] args) {

if (args.length > 0) {
// parse input arguments
if (args.length == 2) {
seed = Long.parseLong(args[0]);
outputPath = args[1];
return true;
} else {
throw new IllegalArgumentException(
"Usage: SplitStreamBoltLocal <seed> <result path>");
}
} else {
System.out.println("Executing SplitBoltTopology example with random data");
System.out.println(" Usage: SplitStreamBoltLocal <seed> <result path>");
}

return false;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.split;

import java.io.File;
import java.io.IOException;

import org.apache.flink.storm.split.SpoutSplitExample.Enrich;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SplitITCase extends StreamingMultipleProgramsTestBase {

private String output;

@Before
public void prepare() throws IOException {
output = getTempFilePath("dummy").split(":")[1];
}

@After
public void cleanUp() throws IOException {
deleteRecursively(new File(output));
}

@Test
public void testEmbeddedSpout() throws Exception {
SpoutSplitExample.main(new String[] { "0", output });
Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
Assert.assertFalse(Enrich.errorOccured);
}

@Test
public void testSpoutSplitTopology() throws Exception {
SplitStreamSpoutLocal.main(new String[] { "0", output });
Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
}

@Test
public void testBoltSplitTopology() throws Exception {
SplitStreamBoltLocal.main(new String[] { "0", output });
Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.streaming.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;

import org.junit.AfterClass;
import org.junit.BeforeClass;

Expand Down Expand Up @@ -51,7 +52,7 @@
*
* </pre>
*/
public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
public class StreamingMultipleProgramsTestBase extends AbstractTestBase {

// ------------------------------------------------------------------------
// The mini cluster that is shared across tests
Expand All @@ -60,7 +61,10 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
protected static final int DEFAULT_PARALLELISM = 4;

protected static ForkableFlinkMiniCluster cluster;


public StreamingMultipleProgramsTestBase() {
super(new Configuration());
}

// ------------------------------------------------------------------------
// Cluster setup & teardown
Expand Down

0 comments on commit 02ef68f

Please sign in to comment.