Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1337] Infer state coders #2133

Closed
wants to merge 2 commits into from

Conversation

aviemzur
Copy link
Member

@aviemzur aviemzur commented Mar 1, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@aviemzur
Copy link
Member Author

aviemzur commented Mar 1, 2017

R: @kennknowles

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.1%) to 69.049% when pulling e5d4539 on aviemzur:infer-state-coders into d66029c on apache:master.

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7999/
--none--

@kennknowles
Copy link
Member

Sorry for the delay - this is on my radar and something I'm quite excited about.

@@ -355,6 +379,12 @@ private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
StateTag<? super K, MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
if (mapKeyCoder == null || mapValueCoder == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that these parameters should not be @Nullable but instead the spec/tag should own this exception and throw it when it discovers that it has to provide these parameters but cannot.

Copy link
Member Author

@aviemzur aviemzur Mar 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the coders needed in all runners though?

My inclination is to say no, which is why I specifically throw exceptions in direct runner since it seems to try to force coder checks wherever encoding can happen (and this was the way to make this change testable).

Runners might handle state in a way which does not need encoding (For example, direct runner works without these coders at all, everything is in-mem. I added these exceptions to make it acknowledge the coders' very existence).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually I am going to have to say "yes". Just like every PCollection needs a coder, every pieces of state needs a coder. The portability goals of Beam include a runner written in Java running a pipeline written in Python, and vice-versa. Data in the underlying model (the Runner API / Fn API) is always just bytes with a coder (spec) to interpret it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I See. I'll move the exceptions from CopyOnAccessInMemoryStateInternals to StateSpecs.

@@ -121,8 +156,23 @@ private StateSpecs() {}
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
public static <T> StateSpec<Object, BagState<T>> bag() {
return bag(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do edit the classes and put @Nullable on the fields that are now nullable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* Given {code coders} are inferred from type arguments defined for this class.
* @param coders Array of coders indexed by the type arguments order.
*/
void offerCoders(Coder[] coders);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, I think. I want to spend a minute pondering it.

What is the contract if it is called more than once? What if some of the coders cannot be provided but other can? These are issues that are very similar to TypedPValue.finishSpecifying() and the coder registry's internal logics.

Copy link
Member Author

@aviemzur aviemzur Mar 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, as I saw it, coders that are already set take precedence, which is why implementations of this method check for coder references which are null and only set them. This is why I named the method offerCoders and not something like setCoders. I can improve the javadoc to explain this (Is there a better name for this method?).

If some coders cannot be provided and the coder is needed, the client which uses the coder will throw an exception notifying the user that a coder should have been set but was not. The user should then change their code to specify a coder explicitly using the StateSpec factory method which takes Coder arguments.
I'm not sure all clients will throw an exception which explains this (I made sure that the exceptions I added to direct runner explain why you are receiving this exception) but I think this holds true for many references to coders across the project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds about right. We should document like you have described here. I guess in the end each StateSpec implementation can decide what to do on its own. And for my other question, I think document that some entries in the array might be null. My favorite approach is to provide a Map<TypeVariable, Coder> but that might be a pain here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that would be better. The thing is that the StateSpec implementations do not hold a reference to the type variable.

public void offerCoders(Coder[] coders) {
if (this.coder == null) {
if (coders[0] != null) {
//noinspection unchecked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What system is this comment for? For javac you would use @SuppressWarnings("unchecked") and many tools grok that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this works in IntelliJ IDEa, I'll change to an annotation on the method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a side note, findbugs doesn't seem to check for this,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we have a lot of things disabled that it would be nice to enable one module at a time until we can stop suppressing warnings. This one is actually pretty valuable to have enabled for code health IMO.

@@ -1815,6 +2051,69 @@ public void processElement(

@Test
@Category({RunnableOnService.class, UsesStatefulParDo.class})
public void testCombiningStateCoderInference() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test coverage for this.

Since coders are often very hard to debug, I think we could use test coverage also for failure cases to make sure the error message is actionable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@aviemzur aviemzur force-pushed the infer-state-coders branch 2 times, most recently from 5efb262 to bfe1545 Compare March 8, 2017 20:06
@aviemzur
Copy link
Member Author

aviemzur commented Mar 8, 2017

@kennknowles addressed your comments in my latest commits.
PTAL

@asfbot
Copy link

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8221/
--none--

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 69.994% when pulling bfe1545 on aviemzur:infer-state-coders into 29d9bd3 on apache:master.

@asfbot
Copy link

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8222/
--none--

try {
Type typeArgument = typeArguments[i];
if (typeArgument instanceof Class) {
coders[i] = coderRegistry.getDefaultCoder((Class) typeArgument);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here you are going to want to pass a full TypeDescriptor, otherwise you can't infer a coder for List<Integer> for example. This is worth having a test for ValueState<List<String>> or some such.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (typeArgument instanceof Class) {
coders[i] = coderRegistry.getDefaultCoder((Class) typeArgument);
}
} catch (CannotProvideCoderException ignored) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about why this is being ignored / what it means to ignore it.

@@ -739,6 +780,7 @@ public Unbound withSideInputs(
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
validateWindowType(input, fn);
inferStateCodersIfNeeded(fn, input.getPipeline().getCoderRegistry());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are going to be some limitations here that you actually could address.

class ListProcessorDoFn<T> extends DoFn<List<T>, List<T>> {
   ... ValueState<T> ...
   ... MapState<String, T> ...
   ... etc ...
}

In these cases, you cannot infer a coder without using the coder from the input PCollection. But if you have the coder from the input, then it should work, in the same way we will infer an output coder for this DoFn.

@aviemzur
Copy link
Member Author

@kennknowles addressed your comments in my latest 2 commits.
PTAL

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 69.958% when pulling 2decabd on aviemzur:infer-state-coders into 29d9bd3 on apache:master.

@asfbot
Copy link

asfbot commented Mar 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8342/
--none--

@@ -930,6 +983,10 @@ public void populateDisplayData(Builder builder) {
public PCollectionTuple expand(PCollection<? extends InputT> input) {
// SplittableDoFn should be forbidden on the runner-side.
validateWindowType(input, fn);

// Use coder registry to determine coders for all StateSpec defined in the fn signature.
inferStateCodersIfNeeded(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles - Calling input.getCoder() here will throw an exception if the user did not set a coder or registered a default coder in coder registry.
I was thinking if I should do this inside of the inference logic instead and catch this exception. However, our previous discussion about forcing coders to be set regardless of runner makes me think we want this to fail these cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time an input is passed to expand it will have a coder.

@@ -44,6 +45,11 @@
private StateSpecs() {}

/** Create a simple state spec for values of type {@code T}. */
public static <T> StateSpec<Object, ValueState<T>> value() {
return value(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that the contract on the value(Coder) factory method is that the coder is non-null (and it could have a checkArgument that enforces it). Here, you should just directly call the constructor with null instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.apply(ParDo.of(fn)).setCoder(myIntegerCoder);

thrown.expect(PipelineExecutionException.class);
thrown.expectMessage("Unable to infer a coder for ValueState and no Coder was specified.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you should get the message at construction time in ParDo.expand(). Throwing the exception in bind() is a little too late. Or, you can make sure that ParDo.expand() calls bind() with some dummy visitor just to enforce that all the coders are ready.

Alternatively, you could be very explicit and have some kind of validate() / verifyCodersPresent() method on StateSpec.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 69.966% when pulling de69835 on aviemzur:infer-state-coders into 5e1be9f on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8702/
--none--

@aviemzur
Copy link
Member Author

@kennknowles Rebased on master and addressed your comments in my latest 2 commits.
PTAL

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than just a couple little comments! Really nice for users, and really nice code. You'll need to rebase again, and it might be good to run a quick ValidatesRunner suite like Flink, and it has the most extensive state support anyhow.

} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new Pipeline.PipelineExecutionException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelineExecutionException is intended for "execution time" which is distinct from "construction time". This exception should always be at "construction time". We don't have a special wrapper exception for this, so perhaps just RuntimeException is appropriate here as well. But I notice that StateSpec.finishSpecifying() doesn't throw checked exceptions, so where are they coming from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, there are no checked exceptions. Implementations of StateSpec.finishSpecifying may throw IllegalStateException, but since we don't need to wrap them in Pipeline.PipelineExecutionException I can remove the second catch clause.

CoderRegistry coderRegistry,
Coder<InputT> inputCoder) {
Type stateType = stateDeclaration.stateType().getType();
if (stateType instanceof ParameterizedType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability nit: I'd reverse the branches here to put the short one first, and just have a short circuit if not ParamterizedType return new Coder[0]. Also saves some indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@aviemzur
Copy link
Member Author

Rebased on top of master and made changes suggested in last review.

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/2671/
--none--

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PostCommit_Java_RunnableOnService_Flink/2100/
--none--

@kennknowles
Copy link
Member

LGTM! And runners are happy.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 70.003% when pulling b21e931 on aviemzur:infer-state-coders into 0582eb9 on apache:master.

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8864/
--none--

@aviemzur
Copy link
Member Author

@aljoscha looks Flink runner fails with these new tests in ParDoTest. Direct runner and Dataflow runner pass, so I'm wondering if it could be an issue with Flink runner. Could you PTAL?

@asfbot
Copy link

asfbot commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/2688/
--none--

@Override
public MyInteger decode(InputStream inStream, Context context) throws CoderException,
IOException {
if (inStream.available() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this if is removed the test that fails on Flink with a NPE passed.

I think not having the if is valid but I'm not 100 % sure. What do you think, @kennknowles ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't noticed this. Having the if is definitely incorrect, since this is checking whether or not it will block, which is irrelevant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler way to make this foolproof might be to just internally delegate to a VarIntCoder anyhow. It would also make it more obvious that the implementation of the coder doesn't matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will indeed be simpler to delegate to VarIntCoder. I'll make the change.

@aviemzur
Copy link
Member Author

aviemzur commented Apr 1, 2017

Run Flink ValidatesRunner

@aviemzur
Copy link
Member Author

aviemzur commented Apr 1, 2017

Run Dataflow ValidatesRunner

@asfbot
Copy link

asfbot commented Apr 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/2150/
--none--

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.06%) to 70.132% when pulling 6595e88 on aviemzur:infer-state-coders into 0582eb9 on apache:master.

@asfbot
Copy link

asfbot commented Apr 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/2710/
--none--

@aviemzur
Copy link
Member Author

aviemzur commented Apr 1, 2017

Retest this please

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.05%) to 70.145% when pulling 6595e88 on aviemzur:infer-state-coders into 0582eb9 on apache:master.

@asfbot
Copy link

asfbot commented Apr 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/9044/
--none--

@asfgit asfgit closed this in e31ca8b Apr 1, 2017
@aviemzur aviemzur deleted the infer-state-coders branch April 13, 2017 04:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants