Skip to content

Commit

Permalink
[FLINK-6731] [tests] Activate strict checkstyle for flink-tests
Browse files Browse the repository at this point in the history
This closes apache#4295
  • Loading branch information
greghogan committed Jul 12, 2017
1 parent 480ccfb commit 9bd491e
Show file tree
Hide file tree
Showing 242 changed files with 4,181 additions and 3,564 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.CalciteConfigBuilder;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.types.Row;

import org.apache.calcite.tools.RuleSets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.types.Row;

import org.junit.Test;
Expand Down
35 changes: 35 additions & 0 deletions flink-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,41 @@ under the License.
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
</dependencies>
<configuration>
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<executions>
<!--
Execute checkstyle after compilation but before tests.
This ensures that any parsing or type checking errors are from
javac, so they look as expected. Beyond that, we want to
fail as early as possible.
-->
<execution>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Code Style, most of the configuration done via plugin management -->
<plugin>
<groupId>org.scalastyle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,25 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;


import static org.junit.Assert.fail;

/**
* Tests cases where Accumulator are
* Tests cases where accumulators:
* a) throw errors during runtime
* b) is not compatible with existing accumulator
* b) are not compatible with existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {

Expand Down Expand Up @@ -91,7 +90,6 @@ public void testFaultyAccumulator() throws Exception {
}
}


@Test
public void testInvalidTypeAccumulator() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.flink.test.accumulators;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
Expand All @@ -38,16 +34,20 @@
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Test for the basic functionality of accumulators. We cannot test all different
* kinds of plans here (iterative, etc.).
*
* TODO Test conflict when different UDFs write to accumulator with same name
*
* <p>TODO Test conflict when different UDFs write to accumulator with same name
* but with different type. The conflict will occur in JobManager while merging.
*/
@SuppressWarnings("serial")
Expand All @@ -60,31 +60,31 @@ public class AccumulatorITCase extends JavaProgramTestBase {
private String resultPath;

private JobExecutionResult result;

@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", INPUT);
resultPath = getTempFilePath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);

// Test accumulator results
System.out.println("Accumulator results:");
JobExecutionResult res = this.result;
System.out.println(AccumulatorHelper.getResultsFormatted(res.getAllAccumulatorResults()));

Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines"));
Assert.assertEquals(Integer.valueOf(3), res.getAccumulatorResult("num-lines"));

Assert.assertEquals(Double.valueOf(getParallelism()), res.getAccumulatorResult("open-close-counter"));

Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));

// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));

// Test distinct words (custom accumulator)
Set<StringValue> distinctWords = Sets.newHashSet();
distinctWords.add(new StringValue("one"));
Expand All @@ -96,18 +96,18 @@ protected void postSubmit() throws Exception {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.readTextFile(dataPath);

DataSet<String> input = env.readTextFile(dataPath);

input.flatMap(new TokenizeLine())
.groupBy(0)
.reduceGroup(new CountWords())
.writeAsCsv(resultPath, "\n", " ");

this.result = env.execute();
}
public static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

private static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

// Needs to be instantiated later since the runtime context is not yet
// initialized at this place
Expand All @@ -120,7 +120,7 @@ public static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<Stri

@Override
public void open(Configuration parameters) {

// Add counters using convenience functions
this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
this.wordsPerLineDistribution = getRuntimeContext().getHistogram("words-per-line");
Expand Down Expand Up @@ -157,20 +157,20 @@ public void open(Configuration parameters) {
// Test counter used in open() and closed()
this.openCloseCounter.add(0.5);
}

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
this.cntNumLines.add(1);
int wordsPerLine = 0;

for (String token : value.toLowerCase().split("\\W+")) {
distinctWords.add(new StringValue(token));
out.collect(new Tuple2<>(token, 1));
++ wordsPerLine;
++wordsPerLine;
}
wordsPerLineDistribution.add(wordsPerLine);
}

@Override
public void close() throws Exception {
// Test counter used in open and close only
Expand All @@ -179,47 +179,45 @@ public void close() throws Exception {
}
}


public static class CountWords
private static class CountWords
extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
{

implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private IntCounter reduceCalls;
private IntCounter combineCalls;

@Override
public void open(Configuration parameters) {
this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
}

@Override
public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
reduceCalls.add(1);
reduceInternal(values, out);
}

@Override
public void combine(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
combineCalls.add(1);
reduceInternal(values, out);
}

private void reduceInternal(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
int sum = 0;
String key = null;

for (Tuple2<String, Integer> e : values) {
key = e.f0;
sum += e.f1;
}
out.collect(new Tuple2<>(key, sum));
}
}

/**
* Custom accumulator
* Custom accumulator.
*/
public static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.test.accumulators;

import org.apache.flink.api.common.accumulators.IntCounter;
Expand All @@ -27,13 +26,17 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;

import org.junit.Assert;

public class AccumulatorIterativeITCase extends JavaProgramTestBase {
/**
* Test accumulator within iteration.
*/
public class AccumulatorIterativeITCase extends JavaProgramTestBase {
private static final int NUM_ITERATIONS = 3;
private static final int NUM_SUBTASKS = 1;
private static final String ACC_NAME = "test";

@Override
protected boolean skipCollectionExecution() {
return true;
Expand All @@ -43,20 +46,20 @@ protected boolean skipCollectionExecution() {
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_SUBTASKS);

IterativeDataSet<Integer> iteration = env.fromElements(1, 2, 3).iterate(NUM_ITERATIONS);

iteration.closeWith(iteration.reduceGroup(new SumReducer())).output(new DiscardingOutputFormat<Integer>());

Assert.assertEquals(NUM_ITERATIONS * 6, (int) env.execute().getAccumulatorResult(ACC_NAME));
}

static final class SumReducer extends RichGroupReduceFunction<Integer, Integer> {

private static final long serialVersionUID = 1L;

private IntCounter testCounter = new IntCounter();

@Override
public void open(Configuration config) throws Exception {
getRuntimeContext().addAccumulator(ACC_NAME, this.testCounter);
Expand Down
Loading

0 comments on commit 9bd491e

Please sign in to comment.