-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state #6333
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state #6333
Conversation
|
||
/** | ||
* General algorithm to read key-grouped state that was written from a {@link PartitioningResult} | ||
* @param <T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description for T
is missing.
if (o1.equals(o2)) { | ||
return 0; | ||
} | ||
// // we catch this case before moving to more expensive tie breaks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what reason we need to comment this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is some commented out code which should be removed.
@@ -305,6 +351,6 @@ private void checkRefillCacheFromStore() { | |||
* after usage. | |||
*/ | |||
@Nonnull | |||
CloseableIterator<E> orderedIterator(); | |||
CloseableIterator<E> orderedIterator();; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a duplicated ;
if (precomputedSnapshot == null) { | ||
precomputedSnapshot = precomputeSnapshot(); | ||
} | ||
return precomputedSnapshot; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the serializers are not all immutable? Should we need a immutable
field for it? Only when it is true we return the precomputeSnapshot
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an easy fix, we could remove the precomputedSnapshot
field and keep it like it was before that the snapshot was computed with every snapshot
call.
return new HeapPriorityQueueStateSnapshot<>( | ||
queueDump, | ||
keyExtractorFunction, | ||
metaInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only dump the queued elements here, should we also need to take a snapshot of the metaInfo in case something of it are might not immutable?
@@ -446,8 +485,10 @@ public String toString() { | |||
@Override | |||
public int numStateEntries() { | |||
int sum = 0; | |||
for (StateTable<K, ?, ?> stateTable : stateTables.values()) { | |||
sum += stateTable.size(); | |||
for (StateSnapshotRestore stateTable : registeredStates.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the name stateTable
is a bit confusion, since it is the RegisteredState
(which might not be StateTable
) now...
|
||
/** | ||
* | ||
* @param <T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description for T
is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First half of minor comments. Will continue reviewing the second half.
public Object extractKeyFromElement(@Nonnull Keyed<?> element) { | ||
return element.getKey(); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this extractor into its own KeyedKeyExtractorFunction
singleton?
@@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) | |||
} | |||
} | |||
|
|||
public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader( | |||
@Nonnull ElementReaderFunction<T> readerFunction, | |||
@Nonnull KeyGroupElementsConsumer<T> elementConsumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indenting these parameter one more level would help to distinguish the body from the parameter list.
import javax.annotation.Nonnull; | ||
|
||
/** | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDocs missing
final TypeSerializer<V> valueSerializer) { | ||
/** The precomputed immutable snapshot of this state */ | ||
@Nullable | ||
private StateMetaInfoSnapshot precomputedSnapshot; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe rename to precomputedStateMetaInfoSnapshot
if (precomputedSnapshot == null) { | ||
precomputedSnapshot = precomputeSnapshot(); | ||
} | ||
return precomputedSnapshot; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an easy fix, we could remove the precomputedSnapshot
field and keep it like it was before that the snapshot was computed with every snapshot
call.
if (o1.equals(o2)) { | ||
return 0; | ||
} | ||
// // we catch this case before moving to more expensive tie breaks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is some commented out code which should be removed.
} | ||
} | ||
} catch (Exception e) { | ||
throw new FlinkRuntimeException("Exception while bulk polling store.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer throwing a checked exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you prefer it? I think there is no better way that caller can handle problems in this call than failing the job (rocksdb problems)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it makes it more explicit that there are things which can go wrong. With checked exceptions you still have the chance to let the program fail. But without them, the caller needs to know that there are unchecked exception in order to do any recovery operation.
Moreover, I'm not sure whether we should manifest on this level how recovery is done or not done. For example, maybe the caller can fetch the latest checkpoint data again and replay all in-between elements in order to recompute the state. This is something which the priority queue should not need to bother about.
stateTables.put(restoredMetaInfo.getName(), stateTable); | ||
snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); | ||
registeredStates.put(restoredMetaInfo.getName(), snapshotRestore); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check that (restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE
for (StateSnapshotRestore stateTable : registeredStates.values()) { | ||
if (stateTable instanceof StateTable) { | ||
sum += ((StateTable<?, ?, ?>) stateTable).size(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the timers don't count for the total number of state entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is only used for some tests, and to be on the safe side it probably only expected to count the keyed state and not some timers.
@@ -60,36 +61,35 @@ | |||
|
|||
/** Result of partitioning the snapshot by key-group. */ | |||
@Nullable | |||
private KeyGroupPartitionedSnapshot partitionedSnapshot; | |||
private StateKeyGroupWriter partitionedSnapshot; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename field
… of raw keyed state Optimization for relaxed bulk polls Deactivate optimization for now because it still contains a bug This closes apache#6333.
… of raw keyed state Optimization for relaxed bulk polls Deactivate optimization for now because it still contains a bug This closes apache#6333.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good to me. Thanks a lot for your work @StefanRRichter!
One thing we should add as a follow up is an end-to-end test which verifies that timers are now scalable. Moreover, I think we should also support that we can configure the state backend as we create it. Similar to the incremental checkpointing. Otherwise it won't be possible to run jobs with different timer service implementations on the same cluster.
Merging this PR once Travis gives green light.
… of raw keyed state Optimization for relaxed bulk polls Deactivate optimization for now because it still contains a bug This closes apache#6333.
What is the purpose of the change
This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (
StateSnapshotRestore
,RegisteredStateMetaInfoBase
). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (seeAbstractStreamOperator.snapshotState(...)
. As a result, after this PR Flink supports asynchronous snapshots for (heap kv / heap queue), (rocks kv / rocks queue) (full and incremental), (rocks kv / heap queue) (only full) and still uses synchronous snapshots for (rocks kv / heap queue) (only incremental).DISCLAIMER: This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges and could have some bugs left that we could fix up in the test phase. Here is a list of some things that already come to my mind:
HeapPriorityQueueSet
.Verifying this change
This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting
RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE
toROCKS
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation