Skip to content

Commit

Permalink
[FLINK-21448] Add randomization of state changelog config
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyy91 authored and rkhachatryan committed Jul 6, 2021
1 parent 55dfea9 commit bad18ed
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,30 @@

package org.apache.flink.streaming.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
import org.apache.flink.util.TestNameProvider;

import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;

import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;

/** A {@link StreamExecutionEnvironment} that executes its jobs on {@link MiniCluster}. */
public class TestStreamEnvironment extends StreamExecutionEnvironment {
private static final String STATE_CHANGE_LOG_CONFIG_ON = "on";
private static final String STATE_CHANGE_LOG_CONFIG_UNSET = "unset";
private static final String STATE_CHANGE_LOG_CONFIG_RAND = "random";
private static final boolean RANDOMIZE_CHECKPOINTING_CONFIG =
Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false"));
private static final String STATE_CHANGE_LOG_CONFIG =
System.getProperty("checkpointing.changelog", STATE_CHANGE_LOG_CONFIG_UNSET).trim();

public TestStreamEnvironment(
MiniCluster miniCluster,
Expand Down Expand Up @@ -75,36 +81,29 @@ public static void setAsContext(
TestStreamEnvironment env =
new TestStreamEnvironment(
miniCluster, parallelism, jarFiles, classpaths);
randomize(conf);
if (RANDOMIZE_CHECKPOINTING_CONFIG) {
randomize(
conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
randomize(
conf,
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
Duration.ofSeconds(0),
Duration.ofMillis(100),
Duration.ofSeconds(2));
}
if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
conf.set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true);
} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(
STATE_CHANGE_LOG_CONFIG_RAND)) {
randomize(conf, CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false);
}
env.configure(conf, env.getUserClassloader());
return env;
};

initializeContextEnvironment(factory);
}

/**
* Randomizes configuration on test case level even if mini cluster is used in a class rule.
*
* <p>Note that only unset properties are randomized.
*
* @param conf the configuration to randomize
*/
private static void randomize(Configuration conf) {
if (RANDOMIZE_CHECKPOINTING_CONFIG) {
final String testName = TestNameProvider.getCurrentTestName();
final PseudoRandomValueSelector valueSelector =
PseudoRandomValueSelector.create(testName != null ? testName : "unknown");
valueSelector.select(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
valueSelector.select(
conf,
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
Duration.ofSeconds(0),
Duration.ofMillis(100),
Duration.ofSeconds(2));
}
}

/**
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
* the given cluster with the given default parallelism.
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,8 @@ under the License.
<forkNumber>0${surefire.forkNumber}</forkNumber>
<hadoop.version>${hadoop.version}</hadoop.version>
<checkpointing.randomization>true</checkpointing.randomization>
<!-- on, unset, or random -->
<checkpointing.changelog>unset</checkpointing.changelog>
<project.basedir>${project.basedir}</project.basedir>
<!--suppress MavenModelInspection -->
<test.randomization.seed>${test.randomization.seed}</test.randomization.seed>
Expand Down

0 comments on commit bad18ed

Please sign in to comment.