Skip to content

Commit

Permalink
[FLINK-5318] Make the RollingSink backwards compatible.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u authored and aljoscha committed Jan 13, 2017
1 parent 7a2d3be commit bbca329
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -128,7 +129,8 @@
*/
@Deprecated
public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
implements InputTypeConfigurable, CheckpointedFunction,
CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -336,7 +338,12 @@ public void initializeState(FunctionInitializationContext context) throws Except
Preconditions.checkArgument(this.restoredBucketStates == null,
"The " + getClass().getSimpleName() + " has already been initialized.");

initFileSystem();
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
}

if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
Expand Down Expand Up @@ -703,7 +710,7 @@ private void handleRestoredBucketState(BucketState bucketState) {
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath)) {
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
lengthFileOut.close();
Expand Down Expand Up @@ -752,6 +759,25 @@ private void handleRestoredBucketState(BucketState bucketState) {
}
}

// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
}

handleRestoredBucketState(state);
}

// --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import java.util.Iterator;

/**
* Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
* Sink that emits its input elements to {@link FileSystem} files within
* buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
*
* <p>
Expand Down Expand Up @@ -124,9 +124,9 @@
* </li>
* <li>
* The part files are written using an instance of {@link Writer}. By default, a
* {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
* of {@code toString()} for every element, separated by newlines. You can configure the writer using the
* {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
* {@link StringWriter} is used, which writes the result of {@code toString()} for
* every element, separated by newlines. You can configure the writer using the
* {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
* can be used to write Hadoop {@code SequenceFiles}.
* </li>
* </ol>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package org.apache.flink.streaming.connectors.fs.bucketing;

import org.apache.commons.io.FileUtils;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;

@Deprecated
public class RollingSinkMigrationTest {

@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

private static final String PART_PREFIX = "part";
private static final String PENDING_SUFFIX = ".pending";
private static final String IN_PROGRESS_SUFFIX = ".in-progress";
private static final String VALID_LENGTH_SUFFIX = ".valid";

@Test
public void testMigration() throws Exception {

/*
* Code ran to get the snapshot:
*
* final File outDir = tempFolder.newFolder();
RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
.setWriter(new StringWriter<String>())
.setBatchSize(5)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
testHarness1.setup();
testHarness1.open();
testHarness1.processElement(new StreamRecord<>("test1", 0L));
testHarness1.processElement(new StreamRecord<>("test2", 0L));
checkFs(outDir, 1, 1, 0, 0);
testHarness1.processElement(new StreamRecord<>("test3", 0L));
testHarness1.processElement(new StreamRecord<>("test4", 0L));
testHarness1.processElement(new StreamRecord<>("test5", 0L));
checkFs(outDir, 1, 4, 0, 0);
StreamTaskState taskState = testHarness1.snapshot(0, 0);
testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
testHarness1.close();
* */

final File outDir = tempFolder.newFolder();

RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
.setWriter(new StringWriter<String>())
.setBatchSize(5)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX);

OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
new StreamSink<>(sink), 10, 1, 0);
testHarness1.setup();
testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
testHarness1.open();

testHarness1.processElement(new StreamRecord<>("test1", 0L));
testHarness1.processElement(new StreamRecord<>("test2", 0L));

checkFs(outDir, 1, 1, 0, 0);

testHarness1.close();
}

private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
int inProg = 0;
int pend = 0;
int compl = 0;
int val = 0;

for (File file: FileUtils.listFiles(outDir, null, true)) {
if (file.getAbsolutePath().endsWith("crc")) {
continue;
}
String path = file.getPath();
if (path.endsWith(IN_PROGRESS_SUFFIX)) {
inProg++;
} else if (path.endsWith(PENDING_SUFFIX)) {
pend++;
} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
val++;
} else if (path.contains(PART_PREFIX)) {
compl++;
}
}

Assert.assertEquals(inprogress, inProg);
Assert.assertEquals(pending, pend);
Assert.assertEquals(completed, compl);
Assert.assertEquals(valid, val);
}

private static String getResourceFilename(String filename) {
ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
URL resource = cl.getResource(filename);
return resource.getFile();
}

static class ValidatingRollingSink<T> extends RollingSink<T> {

private static final long serialVersionUID = -4263974081712009141L;

ValidatingRollingSink(String basePath) {
super(basePath);
}

@Override
public void restoreState(BucketState state) throws Exception {

/**
* this validates that we read the state that was checkpointed by the previous version. We expect it to be:
* In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
* validLength=6
* pendingForNextCheckpoint=[]
* pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
* /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
* /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
* /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
* */

String current = state.currentFile;
long validLength = state.currentFileValidLength;

Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
Assert.assertEquals(6, validLength);

List<String> pendingFiles = state.pendingFiles;
Assert.assertTrue(pendingFiles.isEmpty());

final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
Assert.assertEquals(1, pendingFilesPerCheckpoint.size());

for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
long checkpoint = entry.getKey();
List<String> files = entry.getValue();

Assert.assertEquals(0L, checkpoint);
Assert.assertEquals(4, files.size());

for (int i = 0; i < 4; i++) {
Assert.assertEquals(
"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
files.get(i));
}
}
super.restoreState(state);
}
}
}
Binary file not shown.

0 comments on commit bbca329

Please sign in to comment.