Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-649] Analyse DAG to determine if RDD/DStream has to be cached or not #1739

Closed
wants to merge 2 commits into from

Conversation

jbonofre
Copy link
Member

@jbonofre jbonofre commented Jan 5, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@jbonofre
Copy link
Member Author

jbonofre commented Jan 5, 2017

R: @amitsela

@asfbot
Copy link

asfbot commented Jan 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6427/
--none--

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some comments and I will re-iterate after you push the changes, thanks!

}
}
// update cache candidates with node outputs
for (TaggedPValue output : node.getOutputs()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't iterate over outputs since a PCollection (RDD/DStream) needs to be cached only if it is used as an input to more than one transformation so it won't be evaluated again all the way throughout it's lineage.
The cache-candidate PCollection should be looked for as the output of a transformation in the Evaluator since we want to cache after it is first evaluated (so the evaluator that creates this RDD/DStream will know it should cache at the end of the evaluation).

// if the input or output of the node (aka transform) is already known in the cache
// candidates map, and it appears more than one time, then we enable caching
// considering node input for caching
for (TaggedPValue input : node.getInputs()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here this should be removed since you're looking for the output.

@@ -104,6 +105,9 @@ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationCont
}
unionRDD = context.getSparkContext().union(rdds);
}
if (cacheHint) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is for all following:

if (cacheHint) {
  rdd.cache();
}

and

if (cacheHint) {
  dstream.cache();
}

I'd go for using the runner's Dataset so that we use the user-defined StorageLevel (batch). This will require slight changes, and also get rid of multiReads optimization in EvaluationContext since this is a better cache optimization.

I'd also try something more fluent like:

cacheHint ? rdd.cache : rdd;

Though you'll probably hide it in Dataset anyway so I'm not sure it will do much difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will update that way.

Copy link
Member

@amitsela amitsela Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to leave the ifs ? inline them ? or move into Dataset ?

@@ -74,7 +77,8 @@ public JavaStreamingContext create() {
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
ctxt = new EvaluationContext(jsc, pipeline, jssc);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt,
new HashMap<PCollection, Long>()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the map you populated in the pre-visit, no ?

/**
* Test BEAM-1206 (consequence of BEAM-649).
*/
public class WritingSinkTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, this is a good test specifically for BEAM-1206 but it doesn't directly test BEAM-649, I will try and come up with an idea for how to test this properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of it now, this should have been caught by ROS tests for the Spark runner... Since Write is tested and probably has tests, I assume (and from a quick look I might be right) that we're missing ab HDFSWriter ROS test.

You can open a separate ticket for this I guess.

As for this test, we should probably keep it as long as there's no ROS test for it, but you should assert the result by reading the output file, right ?

@jbonofre
Copy link
Member Author

jbonofre commented Jan 6, 2017

I updated the PR by populating cache candidates with PTransforms inputs and check on the outputs. However, I have to add a hack for the WriteBundles.

@asfbot
Copy link

asfbot commented Jan 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6440/
--none--

@jbonofre
Copy link
Member Author

Rebased to integrate #1747 and add a specific test.

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6512/

Build result: FAILURE

[...truncated 11462 lines...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:332) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Checkstyle violation. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-01-11T16:45:16.069 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-01-11T16:45:16.069 [ERROR] 2017-01-11T16:45:16.069 [ERROR] After correcting the problems, you can resume the build with the command2017-01-11T16:45:16.069 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of 822f675 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6512/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments, thanks!

if (cacheCandidates.get(value) != null) {
count = cacheCandidates.get(value) + 1;
}
if (value.getName().equals("Write/WriteBundles.out")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably can remove the hack now...

@@ -104,6 +105,9 @@ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationCont
}
unionRDD = context.getSparkContext().union(rdds);
}
if (cacheHint) {
Copy link
Member

@amitsela amitsela Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to leave the ifs ? inline them ? or move into Dataset ?

/**
* Test BEAM-1206 (consequence of BEAM-649).
*/
public class WritingSinkTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of it now, this should have been caught by ROS tests for the Spark runner... Since Write is tested and probably has tests, I assume (and from a quick look I might be right) that we're missing ab HDFSWriter ROS test.

You can open a separate ticket for this I guess.

As for this test, we should probably keep it as long as there's no ROS test for it, but you should assert the result by reading the output file, right ?

/**
* Tests for translation of side inputs in the Spark Runner.
*/
public class SideInputTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side inputs are tested by ROS, why do we need this ? (I might have been supportive of this at some point and don't remember why, so excuse me if so 😉 )

@jbonofre
Copy link
Member Author

Fixed checkstyle and remove the WriteBundles hack (thanks for #1747 ).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6513/
--none--

@jbonofre
Copy link
Member Author

@amitsela Let's chat about it tomorrow.

@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6533/
--none--

@jbonofre
Copy link
Member Author

I will do the changes in two steps:

  1. Remove the if from the evaluators and move the cacheHint test in the BorrowDataset & BorrowDStream.
  2. Change the tests to provide one test really focused on the auto-caching enabled (depending of the DAG).

@amitsela
Copy link
Member

Sounds good.

@jbonofre
Copy link
Member Author

Rebased and updated with the cacheHint dealt in the borrowDataset. I started to implement a new test with a pipeline visitor, not yet complete.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 44c2199 on jbonofre:BEAM-649 into ** on apache:master**.

@asfbot
Copy link

asfbot commented Jan 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6883/
--none--

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 44c2199 on jbonofre:BEAM-649 into ** on apache:master**.

@asfbot
Copy link

asfbot commented Jan 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6882/
--none--

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 44c2199 on jbonofre:BEAM-649 into ** on apache:master**.

@asfbot
Copy link

asfbot commented Jan 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6885/
--none--

@jbonofre
Copy link
Member Author

Rebasing to deal with conflicts.

@jbonofre
Copy link
Member Author

Rebased and resolved conflicts. I just have to complete the CacheTest.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.007%) to 69.31% when pulling 194eb31 on jbonofre:BEAM-649 into 0806183 on apache:master.

@jbonofre
Copy link
Member Author

Run Spark RunnableOnService

@jbonofre
Copy link
Member Author

Run Spark RunnableOnService

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.007%) to 69.895% when pulling b0d14a8 on jbonofre:BEAM-649 into e1dc7a8 on apache:master.

@asfbot
Copy link

asfbot commented Mar 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8650/
--none--

@jbonofre
Copy link
Member Author

@amitsela I don't think ResumeFromCheckpointStreamingTest failure is related to my change (at least it doesn't occur on my machine). WDYT ?

@amitsela
Copy link
Member

Run Spark RunnableOnService

@amitsela
Copy link
Member

@jbonofre you're right, it flakes a lot this past few days - Jenkins is terribly unstable anyway..

@jbonofre
Copy link
Member Author

Squashed. Ready to do cosmetic improvements ;)

@jbonofre
Copy link
Member Author

Run Spark RunnableOnService

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a bunch of nits about naming, codestyle, etc.
Feel free to merge the PR after addressing them.
Thanks!

/**
* Options used in this pipeline runner.
*/
private final SparkPipelineOptions mOptions;

private SparkPipelineTranslator translator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be final and a local variable in run() just before declaring final ExecutorService executorService = Executors.newSingleThreadExecutor();

translator = new StreamingTransformTranslator.Translator(
new TransformTranslator.Translator());
updateCacheCandidates(pipeline, translator,
contextFactory.getEvaluationContext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no real need for this to be in a new line.

final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
final EvaluationContext evaluationContext =
new EvaluationContext(jsc, pipeline);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for new line here as well

@@ -254,6 +268,17 @@ private void detectTranslationMode(Pipeline pipeline) {
}

/**
* Evaluator that update/populate the cache candidates.
*/
private void updateCacheCandidates(Pipeline pipeline,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constructor parameters in newline, 4 space indent - see for example here

SparkPipelineTranslator translator,
EvaluationContext evaluationContext) {
CacheVisitor updater =
new CacheVisitor(translator, evaluationContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for newline. updater should be renamed.

JavaRDD<WindowedValue<T>> rdd =
getSparkContext().parallelize(CoderHelpers.toByteArrays(elems, windowCoder))
.map(CoderHelpers.fromByteFunction(windowCoder));
// create a BoundedDataset that would create a RDD on demand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment belongs in the else clause.

@@ -231,4 +252,8 @@ private String storageLevel() {
return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
}

public Map<PCollection, Long> getCacheCandidates() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull this up to where all public methods are, and add Javadoc.
Also, could you do me a favour and remove:

/**
   * Retrieves an iterable of results associated with the PCollection passed in.
   *
   * @param pcollection Collection we wish to translate.
   * @param <T>         Type of elements contained in collection.
   * @return Natively types result associated with collection.
   */
  <T> Iterable<T> get(PCollection<T> pcollection) {
    Iterable<WindowedValue<T>> windowedValues = getWindowedValues(pcollection);
    return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
  }

it's unused and I always forget to remove it.

@Test
public void cacheCandidatesUpdaterTest() throws Exception {
Pipeline pipeline = pipelineRule.createPipeline();
PCollection pCollection = pipeline.apply(Create.of("foo", "bar"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please type the PCollection to avoid unnecessary warnings.

PCollection pCollection = pipeline.apply(Create.of("foo", "bar"));
// first read
pCollection.apply(Count.globally());
// second read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: explain that the second apply would have to re-evaluate pCollection or cache to begin with.


JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions());
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
SparkRunner.CacheVisitor updater =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename updater.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8706/

Build result: FAILURE

[...truncated 2.53 MB...] at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.submit(CoverallsClient.java:84) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.submitData(CoverallsReportMojo.java:400) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:254) ... 33 more2017-03-23T08:25:08.041 [ERROR] 2017-03-23T08:25:08.041 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-23T08:25:08.041 [ERROR] 2017-03-23T08:25:08.041 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-23T08:25:08.041 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptionchannel stoppedSetting status of 843c44e to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8706/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

@coveralls
Copy link

Coverage Status

Coverage remained the same at 70.149% when pulling 843c44e on jbonofre:BEAM-649 into 5e1be9f on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.006%) to 70.156% when pulling af812e2 on jbonofre:BEAM-649 into 5e1be9f on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8715/

Build result: FAILURE

[...truncated 2.55 MB...] at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165) at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167) at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.eluder.coveralls.maven.plugin.httpclient.CoverallsClient.submit(CoverallsClient.java:84) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.submitData(CoverallsReportMojo.java:400) at org.eluder.coveralls.maven.plugin.CoverallsReportMojo.execute(CoverallsReportMojo.java:254) ... 33 more2017-03-23T13:55:09.613 [ERROR] 2017-03-23T13:55:09.613 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-23T13:55:09.613 [ERROR] 2017-03-23T13:55:09.613 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-23T13:55:09.613 [ERROR] [Help 1] http:https://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptionchannel stoppedSetting status of af812e2 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8715/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

@jbonofre
Copy link
Member Author

retest this please

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.152% when pulling af812e2 on jbonofre:BEAM-649 into 5e1be9f on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

@jbonofre
Copy link
Member Author

retest this please

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.152% when pulling af812e2 on jbonofre:BEAM-649 into 5e1be9f on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8722/
--none--

@asfgit asfgit closed this in 82b7b86 Mar 23, 2017
@jbonofre jbonofre deleted the BEAM-649 branch March 23, 2017 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants