Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state #6333

Closed

Conversation

StefanRRichter
Copy link
Contributor

@StefanRRichter StefanRRichter commented Jul 14, 2018

What is the purpose of the change

This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (StateSnapshotRestore, RegisteredStateMetaInfoBase). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (see AbstractStreamOperator.snapshotState(...). As a result, after this PR Flink supports asynchronous snapshots for (heap kv / heap queue), (rocks kv / rocks queue) (full and incremental), (rocks kv / heap queue) (only full) and still uses synchronous snapshots for (rocks kv / heap queue) (only incremental).

DISCLAIMER: This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges and could have some bugs left that we could fix up in the test phase. Here is a list of some things that already come to my mind:

  • Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code.
  • Check proper integration with serializer upgrade story (!!)
  • After that, we can also remove the key-partitioning in the set structure from HeapPriorityQueueSet.
  • Improve integration of the batch wrapper.
  • Improve general state registration logic in the backends, there is potential to remove duplicated code, and generally still improve the integration of the queue state a bit.
  • Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to the next potential timer instead of seeking to the key-group start, bulkPoll.
  • Improve some class/interface/method names.
  • Defensive checks against attempts to register a different state type under an existing name.
  • Add tests, e.g. bulkPoll etc.

Verifying this change

This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE to ROCKS.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs only for now)

@StefanRRichter
Copy link
Contributor Author

CC @tillrohrmann


/**
* General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
* @param <T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description for T is missing.

if (o1.equals(o2)) {
return 0;
}
// // we catch this case before moving to more expensive tie breaks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what reason we need to comment this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is some commented out code which should be removed.

@@ -305,6 +351,6 @@ private void checkRefillCacheFromStore() {
* after usage.
*/
@Nonnull
CloseableIterator<E> orderedIterator();
CloseableIterator<E> orderedIterator();;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a duplicated ;

if (precomputedSnapshot == null) {
precomputedSnapshot = precomputeSnapshot();
}
return precomputedSnapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the serializers are not all immutable? Should we need a immutable field for it? Only when it is true we return the precomputeSnapshot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an easy fix, we could remove the precomputedSnapshot field and keep it like it was before that the snapshot was computed with every snapshot call.

return new HeapPriorityQueueStateSnapshot<>(
queueDump,
keyExtractorFunction,
metaInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only dump the queued elements here, should we also need to take a snapshot of the metaInfo in case something of it are might not immutable?

@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
sum += stateTable.size();
for (StateSnapshotRestore stateTable : registeredStates.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the name stateTable is a bit confusion, since it is the RegisteredState(which might not be StateTable) now...


/**
*
* @param <T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description for T is missing

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First half of minor comments. Will continue reviewing the second half.

public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
return element.getKey();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this extractor into its own KeyedKeyExtractorFunction singleton?

@@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId)
}
}

public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
@Nonnull ElementReaderFunction<T> readerFunction,
@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indenting these parameter one more level would help to distinguish the body from the parameter list.

import javax.annotation.Nonnull;

/**
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaDocs missing

final TypeSerializer<V> valueSerializer) {
/** The precomputed immutable snapshot of this state */
@Nullable
private StateMetaInfoSnapshot precomputedSnapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe rename to precomputedStateMetaInfoSnapshot

if (precomputedSnapshot == null) {
precomputedSnapshot = precomputeSnapshot();
}
return precomputedSnapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an easy fix, we could remove the precomputedSnapshot field and keep it like it was before that the snapshot was computed with every snapshot call.

if (o1.equals(o2)) {
return 0;
}
// // we catch this case before moving to more expensive tie breaks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is some commented out code which should be removed.

}
}
} catch (Exception e) {
throw new FlinkRuntimeException("Exception while bulk polling store.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer throwing a checked exception here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you prefer it? I think there is no better way that caller can handle problems in this call than failing the job (rocksdb problems)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it makes it more explicit that there are things which can go wrong. With checked exceptions you still have the chance to let the program fail. But without them, the caller needs to know that there are unchecked exception in order to do any recovery operation.

Moreover, I'm not sure whether we should manifest on this level how recovery is done or not done. For example, maybe the caller can fetch the latest checkpoint data again and replay all in-between elements in order to recompute the state. This is something which the priority queue should not need to bother about.

stateTables.put(restoredMetaInfo.getName(), stateTable);
snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe check that (restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE

for (StateSnapshotRestore stateTable : registeredStates.values()) {
if (stateTable instanceof StateTable) {
sum += ((StateTable<?, ?, ?>) stateTable).size();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the timers don't count for the total number of state entries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only used for some tests, and to be on the safe side it probably only expected to count the keyed state and not some timers.

@@ -60,36 +61,35 @@

/** Result of partitioning the snapshot by key-group. */
@Nullable
private KeyGroupPartitionedSnapshot partitionedSnapshot;
private StateKeyGroupWriter partitionedSnapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename field

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jul 16, 2018
… of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes apache#6333.
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jul 16, 2018
… of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes apache#6333.
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good to me. Thanks a lot for your work @StefanRRichter!

One thing we should add as a follow up is an end-to-end test which verifies that timers are now scalable. Moreover, I think we should also support that we can configure the state backend as we create it. Similar to the incremental checkpointing. Otherwise it won't be possible to run jobs with different timer service implementations on the same cluster.

Merging this PR once Travis gives green light.

@asfgit asfgit closed this in dbddf00 Jul 16, 2018
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
… of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes apache#6333.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants