Skip to content

Commit

Permalink
[BEAM-7689] make a temporary directory unique for FileBaseSink
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed Jul 9, 2019
1 parent 82cfdb8 commit b72d848
Showing 1 changed file with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Expand Down Expand Up @@ -82,9 +81,6 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -510,7 +506,7 @@ protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, Str
*
* <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if
* tempDirectory is /path/to/foo/, the temporary directory will be
* /path/to/foo/temp-beam-foo-$date.
* /path/to/foo/.temp-beam-$uuid.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
Expand All @@ -522,20 +518,12 @@ public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink) {

private static class TemporaryDirectoryBuilder
implements SerializableFunction<ResourceId, ResourceId> {
private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss");
// The intent of the code is to have a consistent value of tempDirectory across
// all workers, which wouldn't happen if now() was called inline.
private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
// Multiple different sinks may be used in the same output directory; use tempId to create a
// separate temp directory for each.
private final Long tempId = TEMP_COUNT.getAndIncrement();
private final UUID tempUUID = UUID.randomUUID();

@Override
public ResourceId apply(ResourceId tempDirectory) {
// Temp directory has a timestamp and a unique ID
String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", timestamp, tempId);
// Temp directory has a random UUID postfix (BEAM-7689)
String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s", tempUUID);
return tempDirectory
.getCurrentDirectory()
.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
Expand Down

0 comments on commit b72d848

Please sign in to comment.