-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1337] Infer state coders #2133
Conversation
R: @kennknowles |
Refer to this link for build results (access rights to CI server needed): |
Sorry for the delay - this is on my radar and something I'm quite excited about. |
@@ -355,6 +379,12 @@ private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super | |||
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( | |||
StateTag<? super K, MapState<KeyT, ValueT>> address, | |||
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { | |||
if (mapKeyCoder == null || mapValueCoder == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that these parameters should not be @Nullable
but instead the spec/tag should own this exception and throw it when it discovers that it has to provide these parameters but cannot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the coders needed in all runners though?
My inclination is to say no, which is why I specifically throw exceptions in direct runner since it seems to try to force coder checks wherever encoding can happen (and this was the way to make this change testable).
Runners might handle state in a way which does not need encoding (For example, direct runner works without these coders at all, everything is in-mem. I added these exceptions to make it acknowledge the coders' very existence).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So actually I am going to have to say "yes". Just like every PCollection needs a coder, every pieces of state needs a coder. The portability goals of Beam include a runner written in Java running a pipeline written in Python, and vice-versa. Data in the underlying model (the Runner API / Fn API) is always just bytes with a coder (spec) to interpret it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I See. I'll move the exceptions from CopyOnAccessInMemoryStateInternals
to StateSpecs
.
@@ -121,8 +156,23 @@ private StateSpecs() {} | |||
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving | |||
* all the values that have been added. | |||
*/ | |||
public static <T> StateSpec<Object, BagState<T>> bag() { | |||
return bag(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do edit the classes and put @Nullable
on the fields that are now nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Given {code coders} are inferred from type arguments defined for this class. | ||
* @param coders Array of coders indexed by the type arguments order. | ||
*/ | ||
void offerCoders(Coder[] coders); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, I think. I want to spend a minute pondering it.
What is the contract if it is called more than once? What if some of the coders cannot be provided but other can? These are issues that are very similar to TypedPValue.finishSpecifying()
and the coder registry's internal logics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, as I saw it, coders that are already set take precedence, which is why implementations of this method check for coder references which are null
and only set them. This is why I named the method offerCoders
and not something like setCoders
. I can improve the javadoc to explain this (Is there a better name for this method?).
If some coders cannot be provided and the coder is needed, the client which uses the coder will throw an exception notifying the user that a coder should have been set but was not. The user should then change their code to specify a coder explicitly using the StateSpec
factory method which takes Coder
arguments.
I'm not sure all clients will throw an exception which explains this (I made sure that the exceptions I added to direct runner explain why you are receiving this exception) but I think this holds true for many references to coders across the project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds about right. We should document like you have described here. I guess in the end each StateSpec
implementation can decide what to do on its own. And for my other question, I think document that some entries in the array might be null. My favorite approach is to provide a Map<TypeVariable, Coder>
but that might be a pain here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that would be better. The thing is that the StateSpec
implementations do not hold a reference to the type variable.
public void offerCoders(Coder[] coders) { | ||
if (this.coder == null) { | ||
if (coders[0] != null) { | ||
//noinspection unchecked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What system is this comment for? For javac you would use @SuppressWarnings("unchecked")
and many tools grok that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, this works in IntelliJ IDEa, I'll change to an annotation on the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a side note, findbugs
doesn't seem to check for this,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we have a lot of things disabled that it would be nice to enable one module at a time until we can stop suppressing warnings. This one is actually pretty valuable to have enabled for code health IMO.
@@ -1815,6 +2051,69 @@ public void processElement( | |||
|
|||
@Test | |||
@Category({RunnableOnService.class, UsesStatefulParDo.class}) | |||
public void testCombiningStateCoderInference() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test coverage for this.
Since coders are often very hard to debug, I think we could use test coverage also for failure cases to make sure the error message is actionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
5efb262
to
bfe1545
Compare
@kennknowles addressed your comments in my latest commits. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
try { | ||
Type typeArgument = typeArguments[i]; | ||
if (typeArgument instanceof Class) { | ||
coders[i] = coderRegistry.getDefaultCoder((Class) typeArgument); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here you are going to want to pass a full TypeDescriptor
, otherwise you can't infer a coder for List<Integer>
for example. This is worth having a test for ValueState<List<String>>
or some such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if (typeArgument instanceof Class) { | ||
coders[i] = coderRegistry.getDefaultCoder((Class) typeArgument); | ||
} | ||
} catch (CannotProvideCoderException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment about why this is being ignored / what it means to ignore it.
@@ -739,6 +780,7 @@ public Unbound withSideInputs( | |||
@Override | |||
public PCollection<OutputT> expand(PCollection<? extends InputT> input) { | |||
validateWindowType(input, fn); | |||
inferStateCodersIfNeeded(fn, input.getPipeline().getCoderRegistry()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are going to be some limitations here that you actually could address.
class ListProcessorDoFn<T> extends DoFn<List<T>, List<T>> {
... ValueState<T> ...
... MapState<String, T> ...
... etc ...
}
In these cases, you cannot infer a coder without using the coder from the input PCollection
. But if you have the coder from the input, then it should work, in the same way we will infer an output coder for this DoFn
.
@kennknowles addressed your comments in my latest 2 commits. |
Refer to this link for build results (access rights to CI server needed): |
@@ -930,6 +983,10 @@ public void populateDisplayData(Builder builder) { | |||
public PCollectionTuple expand(PCollection<? extends InputT> input) { | |||
// SplittableDoFn should be forbidden on the runner-side. | |||
validateWindowType(input, fn); | |||
|
|||
// Use coder registry to determine coders for all StateSpec defined in the fn signature. | |||
inferStateCodersIfNeeded(fn, input.getPipeline().getCoderRegistry(), input.getCoder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kennknowles - Calling input.getCoder()
here will throw an exception if the user did not set a coder or registered a default coder in coder registry.
I was thinking if I should do this inside of the inference logic instead and catch this exception. However, our previous discussion about forcing coders to be set regardless of runner makes me think we want this to fail these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the time an input is passed to expand
it will have a coder.
@@ -44,6 +45,11 @@ | |||
private StateSpecs() {} | |||
|
|||
/** Create a simple state spec for values of type {@code T}. */ | |||
public static <T> StateSpec<Object, ValueState<T>> value() { | |||
return value(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that the contract on the value(Coder)
factory method is that the coder is non-null (and it could have a checkArgument
that enforces it). Here, you should just directly call the constructor with null
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
.apply(ParDo.of(fn)).setCoder(myIntegerCoder); | ||
|
||
thrown.expect(PipelineExecutionException.class); | ||
thrown.expectMessage("Unable to infer a coder for ValueState and no Coder was specified."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you should get the message at construction time in ParDo.expand()
. Throwing the exception in bind()
is a little too late. Or, you can make sure that ParDo.expand()
calls bind()
with some dummy visitor just to enforce that all the coders are ready.
Alternatively, you could be very explicit and have some kind of validate()
/ verifyCodersPresent()
method on StateSpec
.
2decabd
to
de69835
Compare
Refer to this link for build results (access rights to CI server needed): |
@kennknowles Rebased on master and addressed your comments in my latest 2 commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than just a couple little comments! Really nice for users, and really nice code. You'll need to rebase again, and it might be good to run a quick ValidatesRunner
suite like Flink, and it has the most extensive state support anyhow.
} catch (IllegalAccessException e) { | ||
throw new RuntimeException(e); | ||
} catch (Exception e) { | ||
throw new Pipeline.PipelineExecutionException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PipelineExecutionException
is intended for "execution time" which is distinct from "construction time". This exception should always be at "construction time". We don't have a special wrapper exception for this, so perhaps just RuntimeException
is appropriate here as well. But I notice that StateSpec.finishSpecifying()
doesn't throw checked exceptions, so where are they coming from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, there are no checked exceptions. Implementations of StateSpec.finishSpecifying
may throw IllegalStateException
, but since we don't need to wrap them in Pipeline.PipelineExecutionException
I can remove the second catch
clause.
CoderRegistry coderRegistry, | ||
Coder<InputT> inputCoder) { | ||
Type stateType = stateDeclaration.stateType().getType(); | ||
if (stateType instanceof ParameterizedType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readability nit: I'd reverse the branches here to put the short one first, and just have a short circuit if not ParamterizedType return new Coder[0]
. Also saves some indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
de69835
to
b21e931
Compare
Rebased on top of master and made changes suggested in last review. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
LGTM! And runners are happy. |
Refer to this link for build results (access rights to CI server needed): |
@aljoscha looks Flink runner fails with these new tests in |
Refer to this link for build results (access rights to CI server needed): |
@Override | ||
public MyInteger decode(InputStream inStream, Context context) throws CoderException, | ||
IOException { | ||
if (inStream.available() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this if
is removed the test that fails on Flink with a NPE passed.
I think not having the if
is valid but I'm not 100 % sure. What do you think, @kennknowles ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't noticed this. Having the if
is definitely incorrect, since this is checking whether or not it will block, which is irrelevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A simpler way to make this foolproof might be to just internally delegate to a VarIntCoder
anyhow. It would also make it more obvious that the implementation of the coder doesn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will indeed be simpler to delegate to VarIntCoder
. I'll make the change.
Run Flink ValidatesRunner |
Run Dataflow ValidatesRunner |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-examples-java: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-spark: 1--none-- |
Refer to this link for build results (access rights to CI server needed): |
Retest this please |
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.