-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-649] Analyse DAG to determine if RDD/DStream has to be cached or not #1739
Conversation
R: @amitsela |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some comments and I will re-iterate after you push the changes, thanks!
} | ||
} | ||
// update cache candidates with node outputs | ||
for (TaggedPValue output : node.getOutputs()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you shouldn't iterate over outputs since a PCollection
(RDD
/DStream
) needs to be cached only if it is used as an input to more than one transformation so it won't be evaluated again all the way throughout it's lineage.
The cache-candidate PCollection
should be looked for as the output of a transformation in the Evaluator
since we want to cache after it is first evaluated (so the evaluator that creates this RDD
/DStream
will know it should cache at the end of the evaluation).
// if the input or output of the node (aka transform) is already known in the cache | ||
// candidates map, and it appears more than one time, then we enable caching | ||
// considering node input for caching | ||
for (TaggedPValue input : node.getInputs()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here this should be removed since you're looking for the output.
@@ -104,6 +105,9 @@ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationCont | |||
} | |||
unionRDD = context.getSparkContext().union(rdds); | |||
} | |||
if (cacheHint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is for all following:
if (cacheHint) {
rdd.cache();
}
and
if (cacheHint) {
dstream.cache();
}
I'd go for using the runner's Dataset
so that we use the user-defined StorageLevel
(batch). This will require slight changes, and also get rid of multiReads
optimization in EvaluationContext
since this is a better cache optimization.
I'd also try something more fluent like:
cacheHint ? rdd.cache : rdd;
Though you'll probably hide it in Dataset anyway so I'm not sure it will do much difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will update that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to leave the if
s ? inline them ? or move into Dataset
?
@@ -74,7 +77,8 @@ public JavaStreamingContext create() { | |||
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); | |||
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); | |||
ctxt = new EvaluationContext(jsc, pipeline, jssc); | |||
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); | |||
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt, | |||
new HashMap<PCollection, Long>())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the map you populated in the pre-visit, no ?
/** | ||
* Test BEAM-1206 (consequence of BEAM-649). | ||
*/ | ||
public class WritingSinkTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, this is a good test specifically for BEAM-1206
but it doesn't directly test BEAM-649
, I will try and come up with an idea for how to test this properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think of it now, this should have been caught by ROS
tests for the Spark runner... Since Write
is tested and probably has tests, I assume (and from a quick look I might be right) that we're missing ab HDFSWriter
ROS
test.
You can open a separate ticket for this I guess.
As for this test, we should probably keep it as long as there's no ROS
test for it, but you should assert the result by reading the output file, right ?
I updated the PR by populating cache candidates with |
Refer to this link for build results (access rights to CI server needed): |
Rebased to integrate #1747 and add a specific test. |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11462 lines...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:332) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Checkstyle violation. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-01-11T16:45:16.069 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] After correcting the problems, you can resume the build with the command2017-01-11T16:45:16.069 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of 822f675 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6512/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments, thanks!
if (cacheCandidates.get(value) != null) { | ||
count = cacheCandidates.get(value) + 1; | ||
} | ||
if (value.getName().equals("Write/WriteBundles.out")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably can remove the hack now...
@@ -104,6 +105,9 @@ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationCont | |||
} | |||
unionRDD = context.getSparkContext().union(rdds); | |||
} | |||
if (cacheHint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to leave the if
s ? inline them ? or move into Dataset
?
/** | ||
* Test BEAM-1206 (consequence of BEAM-649). | ||
*/ | ||
public class WritingSinkTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think of it now, this should have been caught by ROS
tests for the Spark runner... Since Write
is tested and probably has tests, I assume (and from a quick look I might be right) that we're missing ab HDFSWriter
ROS
test.
You can open a separate ticket for this I guess.
As for this test, we should probably keep it as long as there's no ROS
test for it, but you should assert the result by reading the output file, right ?
/** | ||
* Tests for translation of side inputs in the Spark Runner. | ||
*/ | ||
public class SideInputTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side inputs are tested by ROS
, why do we need this ? (I might have been supportive of this at some point and don't remember why, so excuse me if so 😉 )
Fixed checkstyle and remove the |
Refer to this link for build results (access rights to CI server needed): |
@amitsela Let's chat about it tomorrow. |
Refer to this link for build results (access rights to CI server needed): |
I will do the changes in two steps:
|
Sounds good. |
Rebased and updated with the cacheHint dealt in the |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Rebasing to deal with conflicts. |
Rebased and resolved conflicts. I just have to complete the |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PostCommit_Java_RunnableOnService_Spark/org.apache.beam:beam-runners-spark: 1
--none-- |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PostCommit_Java_RunnableOnService_Spark/org.apache.beam:beam-runners-spark: 1
--none-- |
@amitsela I don't think |
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PostCommit_Java_RunnableOnService_Spark/org.apache.beam:beam-runners-spark: 1
--none-- |
@jbonofre you're right, it flakes a lot this past few days - Jenkins is terribly unstable anyway.. |
Squashed. Ready to do cosmetic improvements ;) |
Run Spark RunnableOnService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a bunch of nits about naming, codestyle, etc.
Feel free to merge the PR after addressing them.
Thanks!
/** | ||
* Options used in this pipeline runner. | ||
*/ | ||
private final SparkPipelineOptions mOptions; | ||
|
||
private SparkPipelineTranslator translator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be final
and a local variable in run()
just before declaring final ExecutorService executorService = Executors.newSingleThreadExecutor();
translator = new StreamingTransformTranslator.Translator( | ||
new TransformTranslator.Translator()); | ||
updateCacheCandidates(pipeline, translator, | ||
contextFactory.getEvaluationContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no real need for this to be in a new line.
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); | ||
final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline); | ||
final EvaluationContext evaluationContext = | ||
new EvaluationContext(jsc, pipeline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for new line here as well
@@ -254,6 +268,17 @@ private void detectTranslationMode(Pipeline pipeline) { | |||
} | |||
|
|||
/** | |||
* Evaluator that update/populate the cache candidates. | |||
*/ | |||
private void updateCacheCandidates(Pipeline pipeline, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constructor parameters in newline, 4 space indent - see for example here
SparkPipelineTranslator translator, | ||
EvaluationContext evaluationContext) { | ||
CacheVisitor updater = | ||
new CacheVisitor(translator, evaluationContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for newline. updater
should be renamed.
JavaRDD<WindowedValue<T>> rdd = | ||
getSparkContext().parallelize(CoderHelpers.toByteArrays(elems, windowCoder)) | ||
.map(CoderHelpers.fromByteFunction(windowCoder)); | ||
// create a BoundedDataset that would create a RDD on demand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment belongs in the else
clause.
@@ -231,4 +252,8 @@ private String storageLevel() { | |||
return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel(); | |||
} | |||
|
|||
public Map<PCollection, Long> getCacheCandidates() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull this up to where all public
methods are, and add Javadoc.
Also, could you do me a favour and remove:
/**
* Retrieves an iterable of results associated with the PCollection passed in.
*
* @param pcollection Collection we wish to translate.
* @param <T> Type of elements contained in collection.
* @return Natively types result associated with collection.
*/
<T> Iterable<T> get(PCollection<T> pcollection) {
Iterable<WindowedValue<T>> windowedValues = getWindowedValues(pcollection);
return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
}
it's unused and I always forget to remove it.
@Test | ||
public void cacheCandidatesUpdaterTest() throws Exception { | ||
Pipeline pipeline = pipelineRule.createPipeline(); | ||
PCollection pCollection = pipeline.apply(Create.of("foo", "bar")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please type the PCollection
to avoid unnecessary warnings.
PCollection pCollection = pipeline.apply(Create.of("foo", "bar")); | ||
// first read | ||
pCollection.apply(Count.globally()); | ||
// second read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment: explain that the second apply
would have to re-evaluate pCollection
or cache to begin with.
|
||
JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions()); | ||
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); | ||
SparkRunner.CacheVisitor updater = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename updater
.
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PostCommit_Java_RunnableOnService_Spark/org.apache.beam:beam-runners-spark: 1
--none-- |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 2.53 MB...] at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.submit(CoverallsClient.java:84) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.submitData(CoverallsReportMojo.java:400) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:254) ... 33 more2017-03-23T08:25:08.041 [ERROR] 2017-03-23T08:25:08.041 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-23T08:25:08.041 [ERROR] 2017-03-23T08:25:08.041 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-23T08:25:08.041 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptionchannel stoppedSetting status of 843c44e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8706/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 2.55 MB...] at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.submit(CoverallsClient.java:84) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.submitData(CoverallsReportMojo.java:400) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:254) ... 33 more2017-03-23T13:55:09.613 [ERROR] 2017-03-23T13:55:09.613 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-23T13:55:09.613 [ERROR] 2017-03-23T13:55:09.613 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-23T13:55:09.613 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptionchannel stoppedSetting status of af812e2 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8715/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
retest this please |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 1--none-- |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.