Skip to content

Commit

Permalink
This closes apache#44
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Mar 24, 2016
2 parents 1c21aa2 + c451568 commit c1de175
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ public CheckEnabled getStableUniqueNames() {
@Override
public void setStableUniqueNames(CheckEnabled enabled) {
}

@Override
public String getTempLocation() {
return null;
}

@Override
public void setTempLocation(String tempLocation) {
}
};
}
return options;
Expand Down Expand Up @@ -628,4 +637,4 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro
// restore the timerInternals.
this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public interface DataflowPipelineOptions extends
GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
DataflowProfilingOptions {

static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location";

@Description("Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
@Override
Expand All @@ -45,37 +43,19 @@ public interface DataflowPipelineOptions extends
@Override
void setProject(String value);

/**
* GCS path for temporary files, e.g. gs:https://bucket/object
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs:https://"
*
* <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
* {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
* {@link #getStagingLocation()}.
*/
@Description("GCS path for temporary files, eg \"gs:https://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs:https://\". "
+ "At least one of tempLocation or stagingLocation must be set. If tempLocation is unset, "
+ "defaults to using stagingLocation.")
@Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
String getTempLocation();
void setTempLocation(String value);

/**
* GCS path for staging local files, e.g. gs:https://bucket/object
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs:https://"
*
* <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
* {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
* {@link #getStagingLocation()}.
* <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
* must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
* pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
*/
@Description("GCS path for staging local files, e.g. \"gs:https://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs:https://\". "
+ "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+ "defaults to using tempLocation.")
@Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
String getStagingLocation();
void setStagingLocation(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,18 @@ public static enum CheckEnabled {
@Default.Enum("WARNING")
CheckEnabled getStableUniqueNames();
void setStableUniqueNames(CheckEnabled enabled);

/**
* A pipeline level default location for storing temporary files.
*
* <p>This can be a path of any file system.
*
* <p>{@link #getTempLocation()} can be used as a default location in other
* {@link PipelineOptions}.
*
* <p>If it is unset, {@link PipelineRunner} can override it.
*/
@Description("A pipeline level default location for storing temporary files.")
String getTempLocation();
void setTempLocation(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
}

PathValidator validator = dataflowOptions.getPathValidator();
Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation())
&& Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())),
"Missing required value: at least one of tempLocation or stagingLocation must be set.");

if (dataflowOptions.getStagingLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,8 @@ public void testNoStagingLocationAndNoTempLocationFails() {
options.setProject("foo-project");

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Missing required value for group");
thrown.expectMessage(DataflowPipelineOptions.DATAFLOW_STORAGE_LOCATION);
thrown.expectMessage("getStagingLocation");
thrown.expectMessage("getTempLocation");
thrown.expectMessage(
"Missing required value: at least one of tempLocation or stagingLocation must be set.");

DataflowPipelineRunner.fromOptions(options);
}
Expand Down

0 comments on commit c1de175

Please sign in to comment.