diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index 98eb2d4741a68..429d00abbdb5e 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -905,30 +905,30 @@ public RollingSink setAsyncTimeout(long timeout) { * This is used for keeping track of the current in-progress files and files that we mark * for moving from pending to final location after we get a checkpoint-complete notification. */ - static final class BucketState implements Serializable { + public static final class BucketState implements Serializable { private static final long serialVersionUID = 1L; /** * The file that was in-progress when the last checkpoint occurred. */ - String currentFile; + public String currentFile; /** * The valid length of the in-progress file at the time of the last checkpoint. */ - long currentFileValidLength = -1; + public long currentFileValidLength = -1; /** * Pending files that accumulated since the last checkpoint. */ - List pendingFiles = new ArrayList<>(); + public List pendingFiles = new ArrayList<>(); /** * When doing a checkpoint we move the pending files since the last checkpoint to this map * with the id of the checkpoint. When we get the checkpoint-complete notification we move * pending files of completed checkpoints to their final location. */ - final Map> pendingFilesPerCheckpoint = new HashMap<>(); + public final Map> pendingFilesPerCheckpoint = new HashMap<>(); @Override public String toString() { diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index e8bff2110adef..7dbcda75c69cb 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.streaming.connectors.fs.RollingSink; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.connectors.fs.Writer; @@ -149,7 +151,8 @@ */ public class BucketingSink extends RichSinkFunction - implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback { + implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, + CheckpointedRestoring, ProcessingTimeCallback { private static final long serialVersionUID = 1L; @@ -344,7 +347,12 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); - initFileSystem(); + try { + initFileSystem(); + } catch (IOException e) { + LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e); + throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e); + } if (this.refTruncate == null) { this.refTruncate = reflectTruncate(fs); @@ -706,139 +714,183 @@ private void handleRestoredBucketState(State restoredState) { // (we re-start from the last **successful** checkpoint) bucketState.pendingFiles.clear(); - if (bucketState.currentFile != null) { + handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength); - // We were writing to a file when the last checkpoint occurred. This file can either - // be still in-progress or became a pending file at some point after the checkpoint. - // Either way, we have to truncate it back to a valid state (or write a .valid-length - // file that specifies up to which length it is valid) and rename it to the final name - // before starting a new bucket file. + // Now that we've restored the bucket to a valid state, reset the current file info + bucketState.currentFile = null; + bucketState.currentFileValidLength = -1; + bucketState.isWriterOpen = false; - Path partPath = new Path(bucketState.currentFile); - try { - Path partPendingPath = getPendingPathFor(partPath); - Path partInProgressPath = getInProgressPathFor(partPath); - - if (fs.exists(partPendingPath)) { - LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); - // has been moved to pending in the mean time, rename to final location - fs.rename(partPendingPath, partPath); - } else if (fs.exists(partInProgressPath)) { - LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); - // it was still in progress, rename to final path - fs.rename(partInProgressPath, partPath); - } else if (fs.exists(partPath)) { - LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath); - } else { - LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + - "it was moved to final location by a previous snapshot restore", bucketState.currentFile); - } + handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); - // We use reflection to get the .truncate() method, this - // is only available starting with Hadoop 2.7 - if (this.refTruncate == null) { - this.refTruncate = reflectTruncate(fs); - } + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.clear(); + } + } + } + + private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) { + restoredState.pendingFiles.clear(); - // truncate it or write a ".valid-length" file to specify up to which point it is valid - if (refTruncate != null) { - LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); - // some-one else might still hold the lease from a previous try, we are - // recovering, after all ... - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - LOG.debug("Trying to recover file lease {}", partPath); - dfs.recoverLease(partPath); - boolean isclosed = dfs.isFileClosed(partPath); - StopWatch sw = new StopWatch(); - sw.start(); - while (!isclosed) { - if (sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - isclosed = dfs.isFileClosed(partPath); + handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength); + + // Now that we've restored the bucket to a valid state, reset the current file info + restoredState.currentFile = null; + restoredState.currentFileValidLength = -1; + + handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); + + synchronized (restoredState.pendingFilesPerCheckpoint) { + restoredState.pendingFilesPerCheckpoint.clear(); + } + } + + private void handlePendingInProgressFile(String file, long validLength) { + if (file != null) { + + // We were writing to a file when the last checkpoint occurred. This file can either + // be still in-progress or became a pending file at some point after the checkpoint. + // Either way, we have to truncate it back to a valid state (or write a .valid-length + // file that specifies up to which length it is valid) and rename it to the final name + // before starting a new bucket file. + + Path partPath = new Path(file); + try { + Path partPendingPath = getPendingPathFor(partPath); + Path partInProgressPath = getInProgressPathFor(partPath); + + if (fs.exists(partPendingPath)) { + LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); + // has been moved to pending in the mean time, rename to final location + fs.rename(partPendingPath, partPath); + } else if (fs.exists(partInProgressPath)) { + LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); + // it was still in progress, rename to final path + fs.rename(partInProgressPath, partPath); + } else if (fs.exists(partPath)) { + LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath); + } else { + LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + + "it was moved to final location by a previous snapshot restore", file); + } + + // We use reflection to get the .truncate() method, this + // is only available starting with Hadoop 2.7 + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + // truncate it or write a ".valid-length" file to specify up to which point it is valid + if (refTruncate != null) { + LOG.debug("Truncating {} to valid length {}", partPath, validLength); + // some-one else might still hold the lease from a previous try, we are + // recovering, after all ... + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + LOG.debug("Trying to recover file lease {}", partPath); + dfs.recoverLease(partPath); + boolean isclosed = dfs.isFileClosed(partPath); + StopWatch sw = new StopWatch(); + sw.start(); + while (!isclosed) { + if (sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it } + isclosed = dfs.isFileClosed(partPath); } - Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); - if (!truncated) { - LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); - - // we must wait for the asynchronous truncate operation to complete - StopWatch sw = new StopWatch(); - sw.start(); - long newLen = fs.getFileStatus(partPath).getLen(); - while (newLen != bucketState.currentFileValidLength) { - if (sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - newLen = fs.getFileStatus(partPath).getLen(); + } + Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength); + if (!truncated) { + LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); + + // we must wait for the asynchronous truncate operation to complete + StopWatch sw = new StopWatch(); + sw.start(); + long newLen = fs.getFileStatus(partPath).getLen(); + while (newLen != validLength) { + if (sw.getTime() > asyncTimeout) { + break; } - if (newLen != bucketState.currentFileValidLength) { - throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it } + newLen = fs.getFileStatus(partPath).getLen(); } - } else { - LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); - Path validLengthFilePath = getValidLengthPathFor(partPath); - if (!fs.exists(validLengthFilePath)) { - FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); - lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); - lengthFileOut.close(); + if (newLen != validLength) { + throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + "."); } } - - // Now that we've restored the bucket to a valid state, reset the current file info - bucketState.currentFile = null; - bucketState.currentFileValidLength = -1; - bucketState.isWriterOpen = false; - } catch (IOException e) { - LOG.error("Error while restoring BucketingSink state.", e); - throw new RuntimeException("Error while restoring BucketingSink state.", e); - } catch (InvocationTargetException | IllegalAccessException e) { - LOG.error("Could not invoke truncate.", e); - throw new RuntimeException("Could not invoke truncate.", e); + } else { + LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength); + Path validLengthFilePath = getValidLengthPathFor(partPath); + if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) { + FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); + lengthFileOut.writeUTF(Long.toString(validLength)); + lengthFileOut.close(); + } } + + } catch (IOException e) { + LOG.error("Error while restoring BucketingSink state.", e); + throw new RuntimeException("Error while restoring BucketingSink state.", e); + } catch (InvocationTargetException | IllegalAccessException e) { + LOG.error("Could not invoke truncate.", e); + throw new RuntimeException("Could not invoke truncate.", e); } + } + } - // Move files that are confirmed by a checkpoint but did not get moved to final location - // because the checkpoint notification did not happen before a failure + private void handlePendingFilesForPreviousCheckpoints(Map> pendingFilesPerCheckpoint) { + // Move files that are confirmed by a checkpoint but did not get moved to final location + // because the checkpoint notification did not happen before a failure - LOG.debug("Moving pending files to final location on restore."); + LOG.debug("Moving pending files to final location on restore."); - Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - for (Long pastCheckpointId : pastCheckpointIds) { - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { - Path finalPath = new Path(filename); - Path pendingPath = getPendingPathFor(finalPath); + Set pastCheckpointIds = pendingFilesPerCheckpoint.keySet(); + for (Long pastCheckpointId : pastCheckpointIds) { + // All the pending files are buckets that have been completed but are waiting to be renamed + // to their final name + for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) { + Path finalPath = new Path(filename); + Path pendingPath = getPendingPathFor(finalPath); - try { - if (fs.exists(pendingPath)) { - LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); - fs.rename(pendingPath, finalPath); - } - } catch (IOException e) { - LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); - throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e); + try { + if (fs.exists(pendingPath)) { + LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); + fs.rename(pendingPath, finalPath); } + } catch (IOException e) { + LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); + throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e); } } + } + } - synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.clear(); - } + // -------------------------------------------------------------------------------------------- + // Backwards compatibility with Flink 1.1 + // -------------------------------------------------------------------------------------------- + + @Override + public void restoreState(RollingSink.BucketState state) throws Exception { + LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state); + + try { + initFileSystem(); + } catch (IOException e) { + LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e); + throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e); } + + handleRestoredRollingSinkState(state); } // -------------------------------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java index 0c5e16b80684b..3355fae1f8493 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.commons.io.FileUtils; diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java new file mode 100644 index 0000000000000..257b1571a6943 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.RollingSink; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; + +public class RollingToBucketingMigrationTest { + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testMigration() throws Exception { + final File outDir = tempFolder.newFolder(); + + BucketingSink sink = new ValidatingBucketingSink(outDir.getAbsolutePath()) + .setWriter(new StringWriter()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness testHarness1 = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(sink), 10, 1, 0); + testHarness1.setup(); + testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot")); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + testHarness1.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness1.close(); + } + + private static String getResourceFilename(String filename) { + ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + return resource.getFile(); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + static class ValidatingBucketingSink extends BucketingSink { + + private static final long serialVersionUID = -4263974081712009141L; + + ValidatingBucketingSink(String basePath) { + super(basePath); + } + + @Override + public void restoreState(RollingSink.BucketState state) throws Exception { + + /** + * this validates that we read the state that was checkpointed by the previous version. We expect it to be: + * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4 + * validLength=6 + * pendingForNextCheckpoint=[] + * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]} + * */ + + String current = state.currentFile; + long validLength = state.currentFileValidLength; + + Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current); + Assert.assertEquals(6, validLength); + + List pendingFiles = state.pendingFiles; + Assert.assertTrue(pendingFiles.isEmpty()); + + final Map> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint; + Assert.assertEquals(1, pendingFilesPerCheckpoint.size()); + + for (Map.Entry> entry: pendingFilesPerCheckpoint.entrySet()) { + long checkpoint = entry.getKey(); + List files = entry.getValue(); + + Assert.assertEquals(0L, checkpoint); + Assert.assertEquals(4, files.size()); + + for (int i = 0; i < 4; i++) { + Assert.assertEquals( + "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i, + files.get(i)); + } + } + + super.restoreState(state); + } + } +}