Skip to content

Commit

Permalink
Revert "[FLINK-30166][Connector/FileSystem] Refactor tests that use t…
Browse files Browse the repository at this point in the history
…he deprecated StreamingFileSink instead of FileSink" (apache#21628)

* Revert "[FLINK-30166][ORC] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit 1ac202d.

* Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit cd54dfe.

* Revert "[FLINK-30166][Hadoop Sequence Format] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit 1a7a83b.

* Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit 88c450b.

* Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit 3ef68ae.

* Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"

This reverts commit 440a275.

* Revert "[FLINK-30166][Tests] Remove no longer necessary test"

This reverts commit 21c44c0.

* Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"

This reverts commit 24a133c.
  • Loading branch information
XComp committed Jan 9, 2023
1 parent d37fca8 commit 974f884
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@ org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not s
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.sink.writer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It trigger a savepoint for
* the {@link StreamingFileSink} job and restore the {@link FileSink} job from the savepoint taken.
*/
class FileSinkMigrationITCase {

@RegisterExtension
private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

private static final String SOURCE_UID = "source";

private static final String SINK_UID = "sink";

private static final int NUM_SOURCES = 4;

private static final int NUM_SINKS = 3;

private static final int NUM_RECORDS = 10000;

private static final int NUM_BUCKETS = 4;

private SharedReference<CountDownLatch> finalCheckpointLatch;

@BeforeEach
void setup() {
// We wait for two successful checkpoints in sources before shutting down. This ensures that
// the sink can commit its data.
// We need to keep a "static" latch here because all sources need to be kept running
// while we're waiting for the required number of checkpoints. Otherwise, we would lock up
// because we can only do checkpoints while all operators are running.
finalCheckpointLatch = sharedObjects.add(new CountDownLatch(NUM_SOURCES * 2));
}

@Test
void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SharedReference<Collection<Long>> list = sharedObjects.add(new ArrayList<>());
int n = 10000;
env.setParallelism(100);
env.fromSequence(0, n).map(i -> list.applySync(l -> l.add(i)));
env.execute();
assertThat(list.get()).hasSize(n + 1);
assertThat(LongStream.rangeClosed(0, n).boxed().collect(Collectors.toList()))
.isEqualTo(list.get().stream().sorted().collect(Collectors.toList()));
}

@Test
void testMigration(
@TempDir java.nio.file.Path tmpOutputDir, @TempDir java.nio.file.Path tmpSavepointDir)
throws Exception {
String outputPath = tmpOutputDir.toString();
String savepointBasePath = tmpSavepointDir.toString();

final MiniClusterConfiguration cfg =
new MiniClusterConfiguration.Builder()
.withRandomPorts()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(4)
.build();

JobGraph streamingFileSinkJobGraph = createStreamingFileSinkJobGraph(outputPath);
String savepointPath =
executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, savepointBasePath);

JobGraph fileSinkJobGraph = createFileSinkJobGraph(outputPath);
loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath);

IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(
outputPath, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES);
}

private JobGraph createStreamingFileSinkJobGraph(String outputPath) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);

StreamingFileSink<Integer> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder())
.withBucketAssigner(
new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

env.addSource(new StatefulSource(true, finalCheckpointLatch))
.uid(SOURCE_UID)
.setParallelism(NUM_SOURCES)
.addSink(sink)
.setParallelism(NUM_SINKS)
.uid(SINK_UID);
return env.getStreamGraph().getJobGraph();
}

private JobGraph createFileSinkJobGraph(String outputPath) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);

FileSink<Integer> sink =
FileSink.forRowFormat(
new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder())
.withBucketAssigner(
new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

env.addSource(new StatefulSource(false, finalCheckpointLatch))
.uid(SOURCE_UID)
.setParallelism(NUM_SOURCES)
.sinkTo(sink)
.setParallelism(NUM_SINKS)
.uid(SINK_UID);
return env.getStreamGraph().getJobGraph();
}

private String executeAndTakeSavepoint(
MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointBasePath)
throws Exception {
try (MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();
CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture =
miniCluster.submitJob(jobGraph);
JobID jobId = jobSubmissionResultFuture.get().getJobID();

waitForAllTaskRunning(miniCluster, jobId, false);

CompletableFuture<String> savepointResultFuture =
miniCluster.triggerSavepoint(
jobId, savepointBasePath, true, SavepointFormatType.CANONICAL);
return savepointResultFuture.get();
}
}

private void loadSavepointAndExecute(
MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointPath)
throws Exception {
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));

try (MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();
miniCluster.executeJobBlocking(jobGraph);
}
}

private static class StatefulSource extends RichParallelSourceFunction<Integer>
implements CheckpointedFunction, CheckpointListener {

private final boolean takingSavepointMode;

private SharedReference<CountDownLatch> finalCheckpointLatch;

private ListState<Integer> nextValueState;

private int nextValue;

private volatile boolean snapshottedAfterAllRecordsOutput;

private volatile boolean isWaitingCheckpointComplete;

private volatile boolean isCanceled;

public StatefulSource(
boolean takingSavepointMode, SharedReference<CountDownLatch> finalCheckpointLatch) {
this.takingSavepointMode = takingSavepointMode;
this.finalCheckpointLatch = finalCheckpointLatch;
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
nextValueState =
context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("nextValue", Integer.class));

if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) {
nextValue = nextValueState.get().iterator().next();
}
}

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
if (takingSavepointMode) {
sendRecordsUntil(NUM_RECORDS / 3, 0, ctx);
sendRecordsUntil(NUM_RECORDS / 2, 100, ctx);

while (true) {
Thread.sleep(5000);
}
} else {
sendRecordsUntil(NUM_RECORDS, 0, ctx);

// Wait the last checkpoint to commit all the pending records.
isWaitingCheckpointComplete = true;
finalCheckpointLatch.get().await();
}
}

private void sendRecordsUntil(
int targetNumber, int sleepInMillis, SourceContext<Integer> ctx)
throws InterruptedException {
while (!isCanceled && nextValue < targetNumber) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextValue++);
}

if (sleepInMillis > 0) {
Thread.sleep(sleepInMillis);
}
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
nextValueState.update(Collections.singletonList(nextValue));

if (isWaitingCheckpointComplete) {
snapshottedAfterAllRecordsOutput = true;
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput) {
finalCheckpointLatch.get().countDown();
}
}

@Override
public void cancel() {
isCanceled = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -42,7 +43,7 @@
import java.util.concurrent.TimeUnit;

/**
* Test program for the {@link FileSink}.
* Test program for the {@link StreamingFileSink} and {@link FileSink}.
*
* <p>Uses a source that steadily emits a deterministic set of records over 60 seconds, after which
* it idles and waits for job cancellation. Every record has a unique index that is written to the
Expand Down Expand Up @@ -71,7 +72,21 @@ public static void main(final String[] args) throws Exception {
// generate data, shuffle, sink
DataStream<Tuple2<Integer, Integer>> source = env.addSource(new Generator(10, 10, 60));

if (sinkToTest.equalsIgnoreCase("FileSink")) {
if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
final StreamingFileSink<Tuple2<Integer, Integer>> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Tuple2<Integer, Integer>>)
(element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

source.keyBy(0).addSink(sink);
} else if (sinkToTest.equalsIgnoreCase("FileSink")) {
FileSink<Tuple2<Integer, Integer>> sink =
FileSink.forRowFormat(
new Path(outputPath),
Expand All @@ -88,7 +103,7 @@ public static void main(final String[] args) throws Exception {
throw new UnsupportedOperationException("Unsupported sink type: " + sinkToTest);
}

env.execute("FileSinkProgram");
env.execute("StreamingFileSinkProgram");
}

/** Use first field for buckets. */
Expand Down
8 changes: 1 addition & 7 deletions flink-end-to-end-tests/flink-stream-sql-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencies>

<build>
<plugins>
Expand Down
Loading

0 comments on commit 974f884

Please sign in to comment.