-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Out-of-core state backend for JDBC databases #1305
Conversation
8098513
to
fa266ac
Compare
Wow, a lot of stuff. I will look into it once the release is out. 😃 |
f8985d3
to
e5db7f5
Compare
Good stuff! Will need a day more to look through this, but this is a cool way of doing stateful stream computation :-) |
Thanks for the great write up! |
Cool stuff, really! This is very much in line with what I had in mind for a SQL backend. Let me check if I understood everything correct (and see where my understanding is wrong), because I think we should be able to make an "exactly once" version of this based that mechanism. I am basically rephrasing what you describe in a different model. Basic ModeWhat this is effectively doing is a batched and asynchronous version of distributed 2-phase commit transactions. The phases look basically like this:
So far, this should give exactly once guarantees, or am I overlooking something? CompactingWhenever the "checkpoint complete" notification comes (or every so many changes) you trigger a clean-up query in the background. Given that the SQL database has a not completely terrible query planner, this SQL statement would be okay efficient (single semi join).
The good thing is that by virtue of having the incrementing global versions, we can set the isolation level for the query to "read uncommitted", which means that it will not lock anything and thus not compete with any other ongoing modifications. |
Just some remarks:
=> I think the timestamp is not needed. Can't everything be implemented by just using the (monotonically rising) checkpoint IDs? Also, this is unrelated but maybe @StephanEwen or @gyfora know: Why to we have the |
I don't know about the allOrNothingState :/ |
|
You are right about the checkpointIDs being ignored. I don't see why we need to bother with changing the id semantics, the timestamps also serve as checkpoint Ids perfectly (in fact we could drop the checkpoint ID everywhere in Flink and just keep the timestamp). The good thing about timestamps is that they are not incremental, meaning that if the first checkpoint have ts = 100 and the second has ts = 5100 then we can write the intermediate updates with ts 101, 102 ... while maintaining uniqueness and monotonicity. This is something that we use here and that is why I am using timestamps instead of ids. |
@StephanEwen If we assume that the previous job is completely killed of, no writing to the database whatsoever after that happens, then we can properly clean up during recovery. This unfortunately does not seem to hold if you set the retry wait time to very low (like 0 ms in the snapshot). What this means is that the failed job is still writing the failed snapshot to the database after you recovered and cleaned up. As for the compaction, I came up with something very similar for compaction but here is the funny thing and my problem. The query you wrote will run properly on Derby but is invalid on MySql (you cannot create a subquery for the same table as you are modifying). In mysql you need to create an inner join, but that will not work in Derby :P In any case I have made a prototype of this on: https://github.com/gyfora/flink/tree/compaction |
I totally understand your point, but I think it's OK that changes of this scope take longer to review and get in (my HA PR took over a month or so to get in). At the end of the day, it matters more that we get this right (because it covers a very important use case) than getting it in a few days earlier. |
I have updated the description, and ran some more cluster tests without any issues. It would be good if you all could do a second round of reviews please. |
I'm looking at it again. |
Looking though this again... |
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overriding equals() but not hashCode(). Not sure what the usage of this object is, but dropping the equals method may make sense (do you ever compare configs fro equality?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That must have been a mistake I generated it with eclipse :/ I only used it for a test but it doesnt make too much sense so I can remive it.
I have a final comment inline. Otherwise, I think this is good to merge. |
Had an offline chat with @gyfora with the following outcome:
With these changes, looks good to merge. +1 from my side |
}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); | ||
} catch (IOException e) { | ||
// We don't want to fail the job here, but log the error. | ||
if (Log.isDebugEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you accidentally used Jetty's logging here (see import org.eclipse.jetty.util.log.Log
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks Robert :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general: Can we add a check style import check for this? We have a Guava check to disallow importing from our shaded namespace already. @robert: if you think that it is feasible, please open an issue for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a checkstyle rule for that, but I would like to solve the problem in a different way: I recently opened a JIRA for checking whether a Flink module is only using dependencies it has explicitly declared (forbidding to rely on transitive dependencies). WIth that check, we would also identify cases like this one.
d17beb1
to
dc1615a
Compare
@StephanEwen, @rmetzger: I also added a final improvement: -Now compaction is executed in a background thread using a SingleThreadedExecutor These changes are in the last 2 commits, so if you guys +1 these last modifications I will merge it. I guess the compaction part is the most interesting here. thanks |
If no objections I would like to merge this :) |
…pports job shutdown/restart
…eep connections alive on empty snapshots Closes apache#1305
I think you can go ahead. It's in contrib and you guys are battle-testing it anyways... 😉 |
|
||
// ------------------------------------------------------ | ||
|
||
private Environment env; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
env
is not serializable
Changes Unknown when pulling db2a964 on gyfora:master into ** on apache:master**. |
Out-of-core JDBC key-value state
This PR introduces a new state backend (
DbStateBackend
) for storing partitioned and non-partitioned checkpoints in JDBC supported databases.The main feature of this backend is the KvState implementation (
LazyDbKvState
) which provides consistent out-of-core storage of the key-value states for streaming tasks while only keeping a predefined number of states in memory as a cache. The KvState also supports database sharding and automatic compactions.Core changes
In order to implement all the necessary functionality some changes had to be applied to the current state interfaces:
StateBackend.initializeForCurrentJob(...)
now takes the Environment of the instantiating task as a parameter, which allows the backend to access more information about the taskrestoreState
methods of various runtime components now receive a global recovery timestamp (conceptually similar to the checkpoint timestamp)(2) is used to bring the storage to a consistent state after restoring to avoid reading partially successful checkpoints, I will describe in detail later.
DbStateBackend
Non-partitioned states
This implementation of the StateBackend stores the serialized checkpoints in database tables as Blobs.
For every JobID a new database table is created to store the non-partitioned states:
A new handleId is generated for each non-partitioned state checkpoint which will be used to retrieve and clear up later. (this is trivial)
The DbStateBackend also supports plugging in arbitrary state backend implementation for storing the non-partitioned states, in which case only kv states will be checkpointed to the configured database.
new DbStateBackend(config, new MemoryStateBackend())
-> will store non-partitioned states in memory. I think this would be the more common usage pattern as the db state storage is more interesting for the key-value states.Key-Value states
This LazyDbKvState implements kv-state backend that will only keep a limited amount of kv states in memory and stores the rest in the database.
For each key-value state in the job we create a new table to store the states.
Table layout:
As the columns suggest the database will hold multiple versions of a key-value pair versioned by their timestamp.
If a key is not in the KvState cache, it is retrieved from the database by selecting v with the largest timestamp that is smaller than the last written timestamp.
Whenever a LazyDbKvState takes a snapshot, it saves all its modified state (the cache keeps track of what has been modified since the last snapshot) in the database with the checkpoint timestamp and updates its last written ts. The returned KvStateSnapshot basically only needs to contain the stateId(jobid+opid+name) and the checkpoint timestamp (and some additional information for better compaction) so this is stored in-memory.
When the cache reaches its size limit, a predefined number of least recently accessed states are checkpointed to the database (only if they were modified) with timestamp = lastTimestamp+1. Subsequent "spills" will overwrite each other as they have the same timestamp.
Restoring the LazyDbKvState from the snapshot
As I mentioned, the snapshot contains stateId and checkpoint timestamp. We will restore the LazyDbKvState from this information after cleaning up the database of the failed snapshots.
We remove the failed records based on their timestamp: we remove every state update that has ts larger than the checkpoint ts and smaller than the recovery ts. This can be done as long as the recovery timestamp is larger than the last checkpoint timestamp produced by the coordinator. If the checkpoint coordinator survives than this is trivially true, and is still a reasonable assumption in case of master failure.
State compaction
By default no state compaction happens. The user can enable automatic compaction at every so many successful checkpoints by calling dbConfig.setKvStateCompactionFrequency(int compactEvery). Compaction removes all k,v pairs for which there is a more recent valid checkpoint.
This should be made tunable if we want to continue a job from a previous checkpoint.
DbAdapter
As almost all databases have slightly different SQL semantics it's very hard to write universal sql code that will execute on all of them in the same way. To bridge api differences the
DbAdapter
interface can be implemented which defines how each database operation is executed on the connection (how to insert, lookup or clean up states etc).In this PR there 2 adapter implementations: MySqlAdapter, DerbyAdapter (for tests).
Configuration
The DbStateBackend parameters can be configured by passing a
DbBackendConfiguration
object which will hold all necessary information about the database connection, number of shards, maximum cache size for the tasks, maximum batch insert size etc.Sharding
Database sharding is supported and the user can provide more than one database url to the DbBackendConfig. By default each key is written to the shard indexed by keyHash % numShards. It is also possible to define an custom partitioning strategy by passing a Partitioner instance in the DbBackendConfig.setPartitioner(...) method.
To handle sharding each task will maintain exactly 1 connection to each of the shards and keys are partitioned and inserted/ accordingly.
Tests
The backend's functionality is tested using similar testing logic to the preexisting backend. Unit test for the backend and an IT case for running a checkpointed job. All the tests use Apache Derby as their underlying database.
I also ran some tests using MySql. I haven't found any obvious issues, I could store/access 100m+ kv states with about 256 mb tm heap. Insert/lookup query execution time seems to be reasonable with inserts taking under 100 microsecs and lookups in the order of 1-200 microsecs (this depends a lot on the job but I would assume it stays around this with compaction).
We also ran some cluster experiments with about 20 database shards and couple hundred million keys.
Possible improvements
Use bloom filters to detect new keys
Currently new keys will be looked up in the database which adds an unnecessary round-trip to each new key seen, considerably slowing down processing of these elements. A way to tackle this is to keep a bloom filter at each KvState instance which would hold all keys that have been written to the database so far.
The limitation of this approach is that a bloom filter may be relatively big (couple hundred mbs) for a large number of keys (and it cannot be resized later), which might slow down the checkpointing from the beginning. Additionally the bloom filters probably need to be stored using the FsStateBackend.