Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Rename OperatorState to ValueState, add ListState #1347

Closed
wants to merge 3 commits into from

Conversation

aljoscha
Copy link
Contributor

ValueState behaves exactly the same as OperatorState. ListState is a
stateful list to which elements can be added and for which the elements
that it contains can be obtained. To create a ValueState the user
passes a ValueStateIdentifier to
StreamingRuntimeContext.getPartitionedState() while they would pass a
ListStateIdentifier to the same function to retrieve a ListState.

This change is necessary to give the system more information about the
nature of the operator state. We want this to be able to do incremental
snapshots. This would not be possible, for example, if the user had a
List as a state. Inside OperatorState this list would be opaque and
Flink could not create good incremental snapshots.

This also refactors the StateBackend. Before, the logic for partitioned
state was spread out over StreamingRuntimeContext,
AbstractStreamOperator and StateBackend. Now it is consolidated in
StateBackend.

@aljoscha
Copy link
Contributor Author

This does not touch the documentation yet. I would adapt it once we come to the consensus that we want these refactorings.

@aljoscha aljoscha force-pushed the state-enhance branch 3 times, most recently from 4cc64d6 to 08fd343 Compare November 11, 2015 18:42
@gyfora
Copy link
Contributor

gyfora commented Nov 12, 2015

This contains a lot of changes that conflict with #1305 , I think we should come to agreement on that and merge that first.

@aljoscha
Copy link
Contributor Author

@gyfora yes, I'm aware of that 😄

@gyfora
Copy link
Contributor

gyfora commented Nov 12, 2015

I am wondering whether in this form the ListState is a useful abstraction to have.

It has a very forgiving definition of a list: It only supports adding values (ever growing) and no removing or any other List operation that I think would be just as common.

Also I think other collection types such as Sets and Maps are similarly common and those are not covered by this.

Maybe a better approach would be to make a very generic collection state type that supports all the different operations which could then be used to implement ListState, SetState, MapState etc.

@aljoscha
Copy link
Contributor Author

Yes, this is only the starting point. As it is now, this should make it very easy to add other types of state.

@gyfora
Copy link
Contributor

gyfora commented Nov 12, 2015

I see the point of refactoring, what I am saying is that I wouldn't add this ListState implementation to the API because it's not very useful to have. As a user I would expect the normal list operations on the list state.

@aljoscha
Copy link
Contributor Author

I limited the interface on purpose. I think for most purposes it should be sufficient and having it in this limited form allows us to to incremental checkpoints very easily. If we only allow adding elements and clearing the whole state you immediately know, for example, what you have to checkpoint if you do an incremental checkpoint. If we allow arbitrary modifications of the list state we loose that or need to use work-arounds. (I think for that case users would have to use a ValueState that contains a List, for example.)

@aljoscha aljoscha force-pushed the state-enhance branch 4 times, most recently from 39fd35f to 42bced2 Compare November 12, 2015 16:35
ValueState behaves exactly the same as OperatorState. ListState is a
stateful list to which elements can be added and for which the elements
that it contains can be obtained. To create a ValueState the user
passes a ValueStateIdentifier to
StreamingRuntimeContext.getPartitionedState() while they would pass a
ListStateIdentifier to the same function to retrieve a ListState.

This change is necessary to give the system more information about the
nature of the operator state. We want this to be able to do incremental
snapshots. This would not be possible, for example, if the user had a
List as a state. Inside OperatorState this list would be opaque and
Flink could not create good incremental snapshots.

This also refactors the StateBackend. Before, the logic for partitioned
state was spread out over StreamingRuntimeContext,
AbstractStreamOperator and StateBackend. Now it is consolidated in
StateBackend.
@aljoscha
Copy link
Contributor Author

Closing for now, I have a bigger PR coming up

@aljoscha aljoscha closed this Jan 21, 2016
@aljoscha aljoscha deleted the state-enhance branch February 3, 2016 10:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants