Skip to content

Commit

Permalink
[FLINK-11618][state] Refactor operator state repartition mechanism
Browse files Browse the repository at this point in the history
This closes apache#7711.
  • Loading branch information
Myasuka authored and StefanRRichter committed Feb 22, 2019
1 parent 3bf06b9 commit bdb7760
Show file tree
Hide file tree
Showing 21 changed files with 879 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class RollingSinkITCase extends TestLogger {

protected static File dataDir;

private final int maxParallelism = 10;

@BeforeClass
public static void setup() throws Exception {

Expand Down Expand Up @@ -747,9 +749,15 @@ public void testScalingDown() throws Exception {
// state of the previous testHarness3 and testHarness1 while testHarness5
// will take that of the previous testHarness1

OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 3, 2, 0);

OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 3, 2, 1);

OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0);
testHarness4.setup();
testHarness4.initializeState(mergedSnapshot);
testHarness4.initializeState(initState1);
testHarness4.open();

// we do not have a length file for part-2-0 because bucket part-2-0
Expand All @@ -758,7 +766,7 @@ public void testScalingDown() throws Exception {

OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1);
testHarness5.setup();
testHarness5.initializeState(mergedSnapshot);
testHarness5.initializeState(initState2);
testHarness5.open();

checkLocalFs(outDir, 0, 0, 8, 3);
Expand Down Expand Up @@ -793,23 +801,32 @@ public void testScalingUp() throws Exception {
testHarness1.snapshot(0, 0)
);

OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 0);

OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 1);

OperatorSubtaskState initState3 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 2);

testHarness1 = createRescalingTestSink(outDir, 3, 0);
testHarness1.setup();
testHarness1.initializeState(mergedSnapshot);
testHarness1.initializeState(initState1);
testHarness1.open();

checkLocalFs(outDir, 1, 1, 3, 1);

testHarness2 = createRescalingTestSink(outDir, 3, 1);
testHarness2.setup();
testHarness2.initializeState(mergedSnapshot);
testHarness2.initializeState(initState2);
testHarness2.open();

checkLocalFs(outDir, 0, 0, 5, 2);

OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
testHarness3.setup();
testHarness3.initializeState(mergedSnapshot);
testHarness3.initializeState(initState3);
testHarness3.open();

checkLocalFs(outDir, 0, 0, 5, 2);
Expand Down Expand Up @@ -851,7 +868,7 @@ private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSin

private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception {
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), maxParallelism, totalParallelism, taskIdx);
}

private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class BucketingSinkTest extends TestLogger {
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;

private final int maxParallelism = 10;

private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
File outDir, int totalParallelism, int taskIdx, long inactivityInterval) throws Exception {

Expand Down Expand Up @@ -137,7 +139,7 @@ public Path getBucketPath(Clock clock, Path basePath, String element) {

private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
BucketingSink<T> sink, int totalParallelism, int taskIdx) throws Exception {
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), maxParallelism, totalParallelism, taskIdx);
}

private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSinkWithRollover(
Expand Down Expand Up @@ -329,18 +331,24 @@ public void testSameParallelismWithShufflingStates() throws Exception {
testHarness2.processElement(new StreamRecord<>("test3", 0L));
checkLocalFs(outDir, 3, 0, 0, 0);

OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 2, 0);

testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
testHarness1.setup();
testHarness1.initializeState(mergedSnapshot);
testHarness1.initializeState(initState1);
testHarness1.open();

// the one in-progress will be the one assigned to the next instance,
// the other is the test3 which is just not cleaned up
checkLocalFs(outDir, 2, 0, 1, 1);

OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 2, 1);

testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
testHarness2.setup();
testHarness2.initializeState(mergedSnapshot);
testHarness2.initializeState(initState2);
testHarness2.open();

checkLocalFs(outDir, 1, 0, 2, 2);
Expand Down Expand Up @@ -385,16 +393,22 @@ public void testScalingDown() throws Exception {
testHarness2.snapshot(0, 0)
);

OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 3, 2, 0);

testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
testHarness1.setup();
testHarness1.initializeState(mergedSnapshot);
testHarness1.initializeState(initState1);
testHarness1.open();

checkLocalFs(outDir, 1, 0, 3, 3);

OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 3, 2, 1);

testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
testHarness2.setup();
testHarness2.initializeState(mergedSnapshot);
testHarness2.initializeState(initState2);
testHarness2.open();

checkLocalFs(outDir, 0, 0, 4, 4);
Expand Down Expand Up @@ -429,23 +443,32 @@ public void testScalingUp() throws Exception {
testHarness1.snapshot(0, 0)
);

OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 0);

testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
testHarness1.setup();
testHarness1.initializeState(mergedSnapshot);
testHarness1.initializeState(initState1);
testHarness1.open();

checkLocalFs(outDir, 2, 0, 3, 3);

OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 1);

testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
testHarness2.setup();
testHarness2.initializeState(mergedSnapshot);
testHarness2.initializeState(initState2);
testHarness2.open();

checkLocalFs(outDir, 0, 0, 5, 5);

OperatorSubtaskState initState3 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedSnapshot, maxParallelism, 2, 3, 2);

OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100);
testHarness3.setup();
testHarness3.initializeState(mergedSnapshot);
testHarness3.initializeState(initState3);
testHarness3.open();

checkLocalFs(outDir, 0, 0, 5, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
Expand Down Expand Up @@ -383,23 +382,26 @@ public void testScaleUpAfterScalingDown() throws Exception {
final int parallelism3 = 3;
final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));

List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
OperatorSubtaskState operatorSubtaskState = repartitionAndExecute(
topic,
Collections.emptyList(),
new OperatorSubtaskState(),
parallelism1,
parallelism1,
maxParallelism,
IntStream.range(0, parallelism1).boxed().iterator());

operatorSubtaskState = repartitionAndExecute(
topic,
operatorSubtaskState,
parallelism1,
parallelism2,
maxParallelism,
IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator());

operatorSubtaskState = repartitionAndExecute(
topic,
operatorSubtaskState,
parallelism2,
parallelism3,
maxParallelism,
IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator());
Expand All @@ -408,9 +410,10 @@ public void testScaleUpAfterScalingDown() throws Exception {
// not allow us to read all committed messages from the topic. Thus we initialize operators from
// OperatorSubtaskState once more, but without any new data. This should terminate all ongoing transactions.

operatorSubtaskState = repartitionAndExecute(
repartitionAndExecute(
topic,
operatorSubtaskState,
parallelism3,
1,
maxParallelism,
Collections.emptyIterator());
Expand All @@ -423,36 +426,36 @@ public void testScaleUpAfterScalingDown() throws Exception {
deleteTestTopic(topic);
}

private List<OperatorStateHandle> repartitionAndExecute(
private OperatorSubtaskState repartitionAndExecute(
String topic,
List<OperatorStateHandle> inputStates,
int parallelism,
OperatorSubtaskState inputStates,
int oldParallelism,
int newParallelism,
int maxParallelism,
Iterator<Integer> inputData) throws Exception {

List<OperatorStateHandle> outputStates = new ArrayList<>();
List<OperatorSubtaskState> outputStates = new ArrayList<>();
List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();

for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
for (int subtaskIndex = 0; subtaskIndex < newParallelism; subtaskIndex++) {
OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState(
inputStates, maxParallelism, oldParallelism, newParallelism, subtaskIndex);

OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
createTestHarness(topic, maxParallelism, parallelism, subtaskIndex, EXACTLY_ONCE);
createTestHarness(topic, maxParallelism, newParallelism, subtaskIndex, EXACTLY_ONCE);
testHarnesses.add(testHarness);

testHarness.setup();

testHarness.initializeState(new OperatorSubtaskState(
new StateObjectCollection<>(inputStates),
StateObjectCollection.empty(),
StateObjectCollection.empty(),
StateObjectCollection.empty()));
testHarness.initializeState(initState);
testHarness.open();

if (inputData.hasNext()) {
int nextValue = inputData.next();
testHarness.processElement(nextValue, 0);
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

outputStates.addAll(snapshot.getManagedOperatorState());
outputStates.add(snapshot);
checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator state");
checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed state");
checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");
Expand All @@ -468,7 +471,8 @@ private List<OperatorStateHandle> repartitionAndExecute(
testHarness.close();
}

return outputStates;
return AbstractStreamOperatorTestHarness.repackageState(
outputStates.toArray(new OperatorSubtaskState[outputStates.size()]));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
*/
public class FlinkKafkaConsumerBaseTest extends TestLogger {

private static final int maxParallelism = Short.MAX_VALUE / 2;

/**
* Tests that not both types of timestamp extractors / watermark generators can be used.
*/
Expand Down Expand Up @@ -638,6 +640,9 @@ private void testRescaling(
new AbstractStreamOperatorTestHarness[restoredParallelism];

for (int i = 0; i < restoredParallelism; i++) {
OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState(
mergedState, maxParallelism, initialParallelism, restoredParallelism, i);

TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), null),
i,
Expand All @@ -649,7 +654,7 @@ private void testRescaling(
restoredTestHarnesses[i] = createTestHarness(restoredConsumers[i], restoredParallelism, i);

// initializeState() is always called, null signals that we didn't restore
restoredTestHarnesses[i].initializeState(mergedState);
restoredTestHarnesses[i].initializeState(initState);
restoredTestHarnesses[i].open();
}

Expand Down Expand Up @@ -677,7 +682,7 @@ private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(

AbstractStreamOperatorTestHarness<T> testHarness =
new AbstractStreamOperatorTestHarness<>(
new StreamSource<>(source), Short.MAX_VALUE / 2, numSubtasks, subtaskIndex);
new StreamSource<>(source), maxParallelism, numSubtasks, subtaskIndex);

testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);

Expand Down
Loading

0 comments on commit bdb7760

Please sign in to comment.