diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/PseudoRandomValueSelector.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelector.java similarity index 89% rename from flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/PseudoRandomValueSelector.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelector.java index c765d68833bb3..6944de7eac512 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/PseudoRandomValueSelector.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelector.java @@ -15,19 +15,21 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.runtime.testutils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.TestNameProvider; -import net.jcip.annotations.NotThreadSafe; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.NotThreadSafe; + import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -53,7 +55,7 @@ */ @Internal @NotThreadSafe -class PseudoRandomValueSelector { +public class PseudoRandomValueSelector { private static final Logger LOG = LoggerFactory.getLogger(PseudoRandomValueSelector.class); private final Function randomValueSupplier; @@ -106,7 +108,7 @@ private static String getGlobalSeed() { } @VisibleForTesting - static Optional getGitCommitId() { + public static Optional getGitCommitId() { try { Process process = new ProcessBuilder("git", "rev-parse", "HEAD").start(); try (InputStream input = process.getInputStream()) { @@ -121,4 +123,11 @@ static Optional getGitCommitId() { } return Optional.empty(); } + + public static void randomize(Configuration conf, ConfigOption option, T... t1) { + final String testName = TestNameProvider.getCurrentTestName(); + final PseudoRandomValueSelector valueSelector = + PseudoRandomValueSelector.create(testName != null ? testName : "unknown"); + valueSelector.select(conf, option, t1); + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/streaming/util/PseudoRandomValueSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java similarity index 77% rename from flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/streaming/util/PseudoRandomValueSelectorTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java index 515372d910b0c..d89cb6b0d414c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/streaming/util/PseudoRandomValueSelectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/PseudoRandomValueSelectorTest.java @@ -15,27 +15,25 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.runtime.testutils; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.IOException; -import java.time.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.stream.IntStream; import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER; +import static org.apache.flink.configuration.JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB; +import static org.apache.flink.configuration.TaskManagerOptions.CPU_CORES; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; @@ -55,15 +53,14 @@ public class PseudoRandomValueSelectorTest extends TestLogger { */ @Test public void testRandomizationOfValues() { - final Duration[] alternatives = - IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new); + final Double[] alternatives = + IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new); final PseudoRandomValueSelector valueSelector = PseudoRandomValueSelector.create("seed"); - final Set uniqueValues = new HashSet<>(1); + final Set uniqueValues = new HashSet<>(1); for (int i = 0; i < 100; i++) { - final Duration selectedValue = - selectValue(valueSelector, CHECKPOINTING_INTERVAL, alternatives); + final Double selectedValue = selectValue(valueSelector, CPU_CORES, alternatives); uniqueValues.add(selectedValue); } assertThat(uniqueValues.size(), greaterThan(1)); @@ -82,13 +79,13 @@ private T selectValue( /** Tests that the selector will return different values for different seeds. */ @Test public void testRandomizationWithSeed() { - final Duration[] alternatives = - IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new); + final Double[] alternatives = + IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new); - final Set uniqueValues = new HashSet<>(1); + final Set uniqueValues = new HashSet<>(1); for (int i = 0; i < 100; i++) { final PseudoRandomValueSelector selector = PseudoRandomValueSelector.create("test" + i); - uniqueValues.add(selectValue(selector, CHECKPOINTING_INTERVAL, alternatives)); + uniqueValues.add(selectValue(selector, CPU_CORES, alternatives)); } assertThat(uniqueValues.size(), greaterThan(1)); } @@ -96,19 +93,19 @@ public void testRandomizationWithSeed() { /** Tests that the selector produces the same value for the same seed. */ @Test public void testStableRandomization() { - final Duration[] intervals = - IntStream.range(0, 1000).boxed().map(Duration::ofMillis).toArray(Duration[]::new); + final Double[] doubles = + IntStream.range(0, 1000).boxed().map(Double::valueOf).toArray(Double[]::new); final Integer[] numbers = IntStream.range(0, 1000).boxed().toArray(Integer[]::new); final String[] strings = IntStream.range(0, 1000).mapToObj(i -> "string" + i).toArray(String[]::new); - final Set> uniqueValues = new HashSet<>(1); + final Set> uniqueValues = new HashSet<>(1); for (int i = 0; i < 100; i++) { final PseudoRandomValueSelector selector = PseudoRandomValueSelector.create("test"); uniqueValues.add( new Tuple3<>( - selectValue(selector, CHECKPOINTING_INTERVAL, intervals), - selectValue(selector, TOLERABLE_FAILURE_NUMBER, numbers), + selectValue(selector, CPU_CORES, doubles), + selectValue(selector, JOB_MANAGER_HEAP_MEMORY_MB, numbers), selectValue(selector, SAVEPOINT_DIRECTORY, strings))); } assertEquals(1, uniqueValues.size());