Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out-of-core state backend for JDBC databases #1305

Merged
merged 7 commits into from
Nov 24, 2015
Merged

Conversation

gyfora
Copy link
Contributor

@gyfora gyfora commented Oct 27, 2015

Out-of-core JDBC key-value state

This PR introduces a new state backend (DbStateBackend) for storing partitioned and non-partitioned checkpoints in JDBC supported databases.

The main feature of this backend is the KvState implementation (LazyDbKvState) which provides consistent out-of-core storage of the key-value states for streaming tasks while only keeping a predefined number of states in memory as a cache. The KvState also supports database sharding and automatic compactions.

Core changes

In order to implement all the necessary functionality some changes had to be applied to the current state interfaces:

  1. StateBackend.initializeForCurrentJob(...) now takes the Environment of the instantiating task as a parameter, which allows the backend to access more information about the task
  2. On recovery the restoreState methods of various runtime components now receive a global recovery timestamp (conceptually similar to the checkpoint timestamp)

(2) is used to bring the storage to a consistent state after restoring to avoid reading partially successful checkpoints, I will describe in detail later.

DbStateBackend

Non-partitioned states

This implementation of the StateBackend stores the serialized checkpoints in database tables as Blobs.
For every JobID a new database table is created to store the non-partitioned states:

  • Table name: checkpoints_JobID
  • Columns: checkpointId bigint, timestamp bigint, handleId bigint, checkpoint blob
  • Primary key: handleId

A new handleId is generated for each non-partitioned state checkpoint which will be used to retrieve and clear up later. (this is trivial)

The DbStateBackend also supports plugging in arbitrary state backend implementation for storing the non-partitioned states, in which case only kv states will be checkpointed to the configured database.
new DbStateBackend(config, new MemoryStateBackend()) -> will store non-partitioned states in memory. I think this would be the more common usage pattern as the db state storage is more interesting for the key-value states.

Key-Value states

This LazyDbKvState implements kv-state backend that will only keep a limited amount of kv states in memory and stores the rest in the database.
For each key-value state in the job we create a new table to store the states.

Table layout:

  • Table name: StateName_OperatorId_JobId
  • Columns: timestamp bigint, k varbinary(256), v blob
  • Primary key: (k, timestamp)

As the columns suggest the database will hold multiple versions of a key-value pair versioned by their timestamp.

If a key is not in the KvState cache, it is retrieved from the database by selecting v with the largest timestamp that is smaller than the last written timestamp.

Whenever a LazyDbKvState takes a snapshot, it saves all its modified state (the cache keeps track of what has been modified since the last snapshot) in the database with the checkpoint timestamp and updates its last written ts. The returned KvStateSnapshot basically only needs to contain the stateId(jobid+opid+name) and the checkpoint timestamp (and some additional information for better compaction) so this is stored in-memory.

When the cache reaches its size limit, a predefined number of least recently accessed states are checkpointed to the database (only if they were modified) with timestamp = lastTimestamp+1. Subsequent "spills" will overwrite each other as they have the same timestamp.

Restoring the LazyDbKvState from the snapshot

As I mentioned, the snapshot contains stateId and checkpoint timestamp. We will restore the LazyDbKvState from this information after cleaning up the database of the failed snapshots.

We remove the failed records based on their timestamp: we remove every state update that has ts larger than the checkpoint ts and smaller than the recovery ts. This can be done as long as the recovery timestamp is larger than the last checkpoint timestamp produced by the coordinator. If the checkpoint coordinator survives than this is trivially true, and is still a reasonable assumption in case of master failure.

State compaction

By default no state compaction happens. The user can enable automatic compaction at every so many successful checkpoints by calling dbConfig.setKvStateCompactionFrequency(int compactEvery). Compaction removes all k,v pairs for which there is a more recent valid checkpoint.

This should be made tunable if we want to continue a job from a previous checkpoint.

DbAdapter

As almost all databases have slightly different SQL semantics it's very hard to write universal sql code that will execute on all of them in the same way. To bridge api differences the DbAdapter interface can be implemented which defines how each database operation is executed on the connection (how to insert, lookup or clean up states etc).

In this PR there 2 adapter implementations: MySqlAdapter, DerbyAdapter (for tests).

Configuration

The DbStateBackend parameters can be configured by passing a DbBackendConfiguration object which will hold all necessary information about the database connection, number of shards, maximum cache size for the tasks, maximum batch insert size etc.

Sharding

Database sharding is supported and the user can provide more than one database url to the DbBackendConfig. By default each key is written to the shard indexed by keyHash % numShards. It is also possible to define an custom partitioning strategy by passing a Partitioner instance in the DbBackendConfig.setPartitioner(...) method.

To handle sharding each task will maintain exactly 1 connection to each of the shards and keys are partitioned and inserted/ accordingly.

Tests

The backend's functionality is tested using similar testing logic to the preexisting backend. Unit test for the backend and an IT case for running a checkpointed job. All the tests use Apache Derby as their underlying database.

I also ran some tests using MySql. I haven't found any obvious issues, I could store/access 100m+ kv states with about 256 mb tm heap. Insert/lookup query execution time seems to be reasonable with inserts taking under 100 microsecs and lookups in the order of 1-200 microsecs (this depends a lot on the job but I would assume it stays around this with compaction).

We also ran some cluster experiments with about 20 database shards and couple hundred million keys.

Possible improvements

Use bloom filters to detect new keys

Currently new keys will be looked up in the database which adds an unnecessary round-trip to each new key seen, considerably slowing down processing of these elements. A way to tackle this is to keep a bloom filter at each KvState instance which would hold all keys that have been written to the database so far.

The limitation of this approach is that a bloom filter may be relatively big (couple hundred mbs) for a large number of keys (and it cannot be resized later), which might slow down the checkpointing from the beginning. Additionally the bloom filters probably need to be stored using the FsStateBackend.

@gyfora gyfora force-pushed the master branch 8 times, most recently from 8098513 to fa266ac Compare October 28, 2015 13:26
@aljoscha
Copy link
Contributor

Wow, a lot of stuff. I will look into it once the release is out. 😃

@gyfora gyfora force-pushed the master branch 11 times, most recently from f8985d3 to e5db7f5 Compare November 1, 2015 21:20
@StephanEwen
Copy link
Contributor

Good stuff! Will need a day more to look through this, but this is a cool way of doing stateful stream computation :-)

@uce
Copy link
Contributor

uce commented Nov 2, 2015

Thanks for the great write up!

@StephanEwen
Copy link
Contributor

Cool stuff, really! This is very much in line with what I had in mind for a SQL backend.

Let me check if I understood everything correct (and see where my understanding is wrong), because I think we should be able to make an "exactly once" version of this based that mechanism. I am basically rephrasing what you describe in a different model.

Basic Mode

What this is effectively doing is a batched and asynchronous version of distributed 2-phase commit transactions. The phases look basically like this:

  • Adding data: Pipe all modifications into the database, but not commit the transaction. They are tagged with the timestamp of the upcoming checkpoint (or any coordinated increasing version counter). This can happen in the background thread, for as long as the in-operator cache holds all edits that are not in the database yet.
  • Pre-commit: This is when the checkpoint is triggered. All pending edits are written into the database and then the transaction is committed. The state handle only includes the timestamp used on the elements. In the classical 2-phase transactions, after a task acks the pre-commit, it has to be able to recover to that state, which is given here. The checkpoint is not immediately valid for recovery though, which means that recovery has to have either a filter, or issue a query that deletes all records with timestamps larger than the version given during recovery. After the pre-commit, the timestamp is locally incremented and work can continue.
  • Full commit: This happens implicitly when the checkpoint coordinator marks the checkpoint as complete.
  • Recovery: The timestamp (or version counter) of the last successful checkpoint is restored, the deletion of records that were committed (but where the checkpoint did not succeed as a whole) happens, then records are lazily fetched.

So far, this should give exactly once guarantees, or am I overlooking something?

Compacting

Whenever the "checkpoint complete" notification comes (or every so many changes) you trigger a clean-up query in the background. Given that the SQL database has a not completely terrible query planner, this SQL statement would be okay efficient (single semi join).

DELETE FROM "table name" t1
WHERE EXISTS 
  (SELECT *
     FROM "table name" t2
    WHERE t2.handle_id = t1.handle_id
      AND t2.timestamp > t1.timestamp    //-- a newer version exists for the same handle
      AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is globally committed
  )

The good thing is that by virtue of having the incrementing global versions, we can set the isolation level for the query to "read uncommitted", which means that it will not lock anything and thus not compete with any other ongoing modifications.

@aljoscha
Copy link
Contributor

aljoscha commented Nov 3, 2015

Just some remarks:

  • DummyEnvironment seems unnecessary, we already have StreamMockEnvironment. I think it could be reused.
  • In the first version you had both the timestamp and checkpoint and recovery/key lookup took both into account. The recent version uses just the timestamp for lookup. Both introduce the new restore timestamp in the restore methods.
  • The cleanup of failed checkpoints took into account the checkpoint and the recovery timestamp, but I think the recovery timestamp was always redundant since the condition in the SQL statement would always hold.

=> I think the timestamp is not needed. Can't everything be implemented by just using the (monotonically rising) checkpoint IDs?

Also, this is unrelated but maybe @StephanEwen or @gyfora know: Why to we have the allOrNothingState in CheckpointCoordinator.restoreLatestCheckpointedState?

@gyfora
Copy link
Contributor Author

gyfora commented Nov 3, 2015

@aljoscha

  1. I was initially using the MockEnvironments but I added the DummyEnvironment for several reasons: I wanted control over the JobId and the number of subtasks for which I would have changed the MockEnvironment. Also I wanted to avoid having to clean up the memorymanager and other resources as I really don't need them
  2. I don't really understand what you mean here, the recovery timestamp is only used for cleanup on restore
  3. Imagine a scenario where 2 task are restoring . 1 restores quickly and starts writing new timestamps. If we call cleanup on the other task it will remove the new states if we don't bound by recovery timestamp. This can happen easily.

I don't know about the allOrNothingState :/

@aljoscha
Copy link
Contributor

aljoscha commented Nov 3, 2015

  1. Ah, I meant the lookupTimestamp. In an earlier version you used both the checkpointId and lookupTimestamp to perform key lookups.
  2. I see, in this implementation of state the timestamp has basically assumed the role of the checkpointId and the checkpointId is (I think) completely ignored. Correct? Couldn't we then change the semantics of the checkpointId to work like the timestamps (they are somewhat logical, not physical timestamps anyways)?

@gyfora
Copy link
Contributor Author

gyfora commented Nov 3, 2015

You are right about the checkpointIDs being ignored. I don't see why we need to bother with changing the id semantics, the timestamps also serve as checkpoint Ids perfectly (in fact we could drop the checkpoint ID everywhere in Flink and just keep the timestamp).

The good thing about timestamps is that they are not incremental, meaning that if the first checkpoint have ts = 100 and the second has ts = 5100 then we can write the intermediate updates with ts 101, 102 ... while maintaining uniqueness and monotonicity. This is something that we use here and that is why I am using timestamps instead of ids.

@gyfora
Copy link
Contributor Author

gyfora commented Nov 3, 2015

@StephanEwen
Thanks for the comments. You are right the main idea is exactly as you described.
The reason why exactly-once is violated in some corner cases because it can happen that the pre-commit phase of the previous checkpoint is still failing during recovery.

If we assume that the previous job is completely killed of, no writing to the database whatsoever after that happens, then we can properly clean up during recovery.

This unfortunately does not seem to hold if you set the retry wait time to very low (like 0 ms in the snapshot). What this means is that the failed job is still writing the failed snapshot to the database after you recovered and cleaned up.

As for the compaction, I came up with something very similar for compaction but here is the funny thing and my problem. The query you wrote will run properly on Derby but is invalid on MySql (you cannot create a subquery for the same table as you are modifying). In mysql you need to create an inner join, but that will not work in Derby :P

In any case I have made a prototype of this on: https://github.com/gyfora/flink/tree/compaction
The user can define the frequency of compaction (compact every so many checkpoints). And it also makes sure that compaction and cleanup is only executed on 1 subtask to avoid double work.

@uce
Copy link
Contributor

uce commented Nov 12, 2015

I totally understand your point, but I think it's OK that changes of this scope take longer to review and get in (my HA PR took over a month or so to get in). At the end of the day, it matters more that we get this right (because it covers a very important use case) than getting it in a few days earlier.

@gyfora
Copy link
Contributor Author

gyfora commented Nov 15, 2015

I have updated the description, and ran some more cluster tests without any issues.

It would be good if you all could do a second round of reviews please.

@aljoscha
Copy link
Contributor

I'm looking at it again.

@StephanEwen
Copy link
Contributor

Looking though this again...

}

@Override
public boolean equals(Object obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding equals() but not hashCode(). Not sure what the usage of this object is, but dropping the equals method may make sense (do you ever compare configs fro equality?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That must have been a mistake I generated it with eclipse :/ I only used it for a test but it doesnt make too much sense so I can remive it.

@StephanEwen
Copy link
Contributor

I have a final comment inline. Otherwise, I think this is good to merge.

@StephanEwen
Copy link
Contributor

Had an offline chat with @gyfora with the following outcome:

  • A deterministic state identifier is needed
  • Small change to pass that identifier as a single ID String, initially internally constructed by state name + operator ID (as in this implementation)
  • That way, the streaming runtime can change handling of state names and operator IDs without breaking state backend implementations

With these changes, looks good to merge.

+1 from my side

}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
} catch (IOException e) {
// We don't want to fail the job here, but log the error.
if (Log.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you accidentally used Jetty's logging here (see import org.eclipse.jetty.util.log.Log)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks Robert :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general: Can we add a check style import check for this? We have a Guava check to disallow importing from our shaded namespace already. @robert: if you think that it is feasible, please open an issue for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a checkstyle rule for that, but I would like to solve the problem in a different way: I recently opened a JIRA for checking whether a Flink module is only using dependencies it has explicitly declared (forbidding to rely on transitive dependencies). WIth that check, we would also identify cases like this one.

@gyfora gyfora force-pushed the master branch 2 times, most recently from d17beb1 to dc1615a Compare November 21, 2015 19:19
@gyfora
Copy link
Contributor Author

gyfora commented Nov 22, 2015

@StephanEwen, @rmetzger:
I addressed the comments regarding the logs and the state id.

I also added a final improvement:

-Now compaction is executed in a background thread using a SingleThreadedExecutor
-At empty checkpoints a keepalive call is executed against the connections to avoid connection drops

These changes are in the last 2 commits, so if you guys +1 these last modifications I will merge it. I guess the compaction part is the most interesting here.

thanks

@gyfora
Copy link
Contributor Author

gyfora commented Nov 23, 2015

If no objections I would like to merge this :)

@aljoscha
Copy link
Contributor

I think you can go ahead. It's in contrib and you guys are battle-testing it anyways... 😉

@asfgit asfgit merged commit db2a964 into apache:master Nov 24, 2015

// ------------------------------------------------------

private Environment env;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

env is not serializable

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling db2a964 on gyfora:master into ** on apache:master**.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants