Skip to content

Commit

Permalink
[FLINK-21448] Randomize ITTests
Browse files Browse the repository at this point in the history
  • Loading branch information
curcur authored and rkhachatryan committed Jul 6, 2021
1 parent 310e0e9 commit 55dfea9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
* limitations under the License.
*/

package org.apache.flink.streaming.util;
package org.apache.flink.runtime.testutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.TestNameProvider;

import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
Expand All @@ -53,7 +55,7 @@
*/
@Internal
@NotThreadSafe
class PseudoRandomValueSelector {
public class PseudoRandomValueSelector {
private static final Logger LOG = LoggerFactory.getLogger(PseudoRandomValueSelector.class);

private final Function<Integer, Integer> randomValueSupplier;
Expand Down Expand Up @@ -106,7 +108,7 @@ private static String getGlobalSeed() {
}

@VisibleForTesting
static Optional<String> getGitCommitId() {
public static Optional<String> getGitCommitId() {
try {
Process process = new ProcessBuilder("git", "rev-parse", "HEAD").start();
try (InputStream input = process.getInputStream()) {
Expand All @@ -121,4 +123,11 @@ static Optional<String> getGitCommitId() {
}
return Optional.empty();
}

public static <T> void randomize(Configuration conf, ConfigOption<T> option, T... t1) {
final String testName = TestNameProvider.getCurrentTestName();
final PseudoRandomValueSelector valueSelector =
PseudoRandomValueSelector.create(testName != null ? testName : "unknown");
valueSelector.select(conf, option, t1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
* limitations under the License.
*/

package org.apache.flink.streaming.util;
package org.apache.flink.runtime.testutils;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER;
import static org.apache.flink.configuration.JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB;
import static org.apache.flink.configuration.TaskManagerOptions.CPU_CORES;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
Expand All @@ -55,15 +53,14 @@ public class PseudoRandomValueSelectorTest extends TestLogger {
*/
@Test
public void testRandomizationOfValues() {
final Duration[] alternatives =
IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new);
final Double[] alternatives =
IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new);

final PseudoRandomValueSelector valueSelector = PseudoRandomValueSelector.create("seed");

final Set<Duration> uniqueValues = new HashSet<>(1);
final Set<Double> uniqueValues = new HashSet<>(1);
for (int i = 0; i < 100; i++) {
final Duration selectedValue =
selectValue(valueSelector, CHECKPOINTING_INTERVAL, alternatives);
final Double selectedValue = selectValue(valueSelector, CPU_CORES, alternatives);
uniqueValues.add(selectedValue);
}
assertThat(uniqueValues.size(), greaterThan(1));
Expand All @@ -82,33 +79,33 @@ private <T> T selectValue(
/** Tests that the selector will return different values for different seeds. */
@Test
public void testRandomizationWithSeed() {
final Duration[] alternatives =
IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new);
final Double[] alternatives =
IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new);

final Set<Duration> uniqueValues = new HashSet<>(1);
final Set<Double> uniqueValues = new HashSet<>(1);
for (int i = 0; i < 100; i++) {
final PseudoRandomValueSelector selector = PseudoRandomValueSelector.create("test" + i);
uniqueValues.add(selectValue(selector, CHECKPOINTING_INTERVAL, alternatives));
uniqueValues.add(selectValue(selector, CPU_CORES, alternatives));
}
assertThat(uniqueValues.size(), greaterThan(1));
}

/** Tests that the selector produces the same value for the same seed. */
@Test
public void testStableRandomization() {
final Duration[] intervals =
IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new);
final Double[] doubles =
IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new);
final Integer[] numbers = IntStream.range(0, 1000).boxed().toArray(Integer[]::new);
final String[] strings =
IntStream.range(0, 1000).mapToObj(i -> "string" + i).toArray(String[]::new);

final Set<Tuple3<Duration, Integer, String>> uniqueValues = new HashSet<>(1);
final Set<Tuple3<Double, Integer, String>> uniqueValues = new HashSet<>(1);
for (int i = 0; i < 100; i++) {
final PseudoRandomValueSelector selector = PseudoRandomValueSelector.create("test");
uniqueValues.add(
new Tuple3<>(
selectValue(selector, CHECKPOINTING_INTERVAL, intervals),
selectValue(selector, TOLERABLE_FAILURE_NUMBER, numbers),
selectValue(selector, CPU_CORES, doubles),
selectValue(selector, JOB_MANAGER_HEAP_MEMORY_MB, numbers),
selectValue(selector, SAVEPOINT_DIRECTORY, strings)));
}
assertEquals(1, uniqueValues.size());
Expand Down

0 comments on commit 55dfea9

Please sign in to comment.