Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Make KV snapshot/lookup logic independent fr…
Browse files Browse the repository at this point in the history
…om checkpoint id
  • Loading branch information
gyfora committed Nov 3, 2015
1 parent 14650c2 commit 98dee30
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,10 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
smt.executeUpdate(
"CREATE TABLE IF NOT EXISTS kvstate_" + stateId
+ " ("
+ "checkpointId bigint, "
+ "timestamp bigint, "
+ "k varbinary(256), "
+ "v blob, "
+ "PRIMARY KEY (k, checkpointId, timestamp), "
+ "KEY cleanup (checkpointId, timestamp)"
+ "PRIMARY KEY (k, timestamp) "
+ ")");
}
}
Expand All @@ -203,7 +201,7 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
return con.prepareStatement(
"INSERT INTO kvstate_" + stateId + " (checkpointId, timestamp, k, v) VALUES (?,?,?,?)");
"INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?)");
}

/**
Expand All @@ -215,22 +213,19 @@ public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection co
* @param insertStatement
* Statement prepared in
* {@link #prepareKVCheckpointInsert(String, Connection)}
* @param checkpointId
* @param timestamp
* @param key
* @param value
* @throws SQLException
*/
public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
long timestamp,
public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long timestamp,
byte[] key, byte[] value) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
insertStatement.setBytes(3, key);
insertStatement.setLong(1, timestamp);
insertStatement.setBytes(2, key);
if (value != null) {
insertStatement.setBytes(4, value);
insertStatement.setBytes(3, value);
} else {
insertStatement.setNull(4, Types.BLOB);
insertStatement.setNull(3, Types.BLOB);
}
}

Expand All @@ -244,34 +239,30 @@ public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws
return con.prepareStatement("SELECT v"
+ " FROM kvstate_" + stateId
+ " WHERE k = ?"
+ " AND checkpointId <= ?"
+ " AND timestamp <= ?"
+ " ORDER BY checkpointId DESC LIMIT 1");
+ " ORDER BY timestamp DESC LIMIT 1");
}

/**
* Retrieve the latest value from the database for a given key that has
* checkpointId <= lookupId and checkpointTs <= lookupTs.
*
* @param stateId
* Unique identifier of the kvstate (usually the table name).
* @param lookupStatement
* The statement returned by
* {@link #prepareKeyLookup(String, Connection)}.
* @param key
* The key to lookup.
* @param lookupId
* Latest checkpoint id to select.
* @param lookupTs
* Latest checkpoint ts to select.
* @return The latest valid value for the key.
* @throws SQLException
*/
public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId,
long lookupTs) throws SQLException {
* Retrieve the latest value from the database for a given key and
* timestamp.
*
* @param stateId
* Unique identifier of the kvstate (usually the table name).
* @param lookupStatement
* The statement returned by
* {@link #prepareKeyLookup(String, Connection)}.
* @param key
* The key to lookup.
* @param lookupTimestamp
* Latest timestamp we want to retrieve
* @return The latest valid value for the key.
* @throws SQLException
*/
public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupTimestamp)
throws SQLException {
lookupStatement.setBytes(1, key);
lookupStatement.setLong(2, lookupId);
lookupStatement.setLong(3, lookupTs);
lookupStatement.setLong(2, lookupTimestamp);

ResultSet res = lookupStatement.executeQuery();

Expand All @@ -283,18 +274,17 @@ public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[
}

/**
* Remove partially failed snapshots using the latest id and a global
* recovery timestamp. All records with a higher id but lower timestamp
* should be deleted from the database.
* Remove failed states by removing everything with timestamp between the
* checkpoint and recovery times.
*
*/
public void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId, long reoveryTs)
throws SQLException {
public void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointTimestamp,
long recoveryTimestamp) throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE FROM kvstate_" + stateId
+ " WHERE checkpointId > " + checkpointId
+ " AND timestamp < " + reoveryTs);
+ " WHERE timestamp > " + checkpointTimestamp
+ " AND timestamp < " + recoveryTimestamp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
/**
*
* Lazily fetched {@link KvState} using a SQL backend. Key-value pairs are
* cached on heap and are lazily retrieved on access. Key's that are dropped
* from the cache will remain on heap until they are evicted to the backend upon
* the next successful checkpoint.
* cached on heap and are lazily retrieved on access.
*
*/
public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend> {
Expand Down Expand Up @@ -83,11 +81,8 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend> {
// LRU cache for the key-value states backed by the database
private final StateCache cache;

// Checkpoint ID and timestamp of the last successful checkpoint for loading
// missing values to the cache. The lookup timestamp may be incremented
// between snapshots as we evict modified values from the cache.
private long lookupId;
private long lookupTs;
// Timestamp of the last written state
private long lastTimestamp;

// ------------------------------------------------------

Expand All @@ -97,15 +92,14 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend> {
*/
public LazyDbKvState(String kvStateId, Connection con, DbBackendConfig conf, TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
this(kvStateId, con, conf, keySerializer, valueSerializer, defaultValue, -1, -1);
this(kvStateId, con, conf, keySerializer, valueSerializer, defaultValue, -1);
}

/**
* Initialize the {@link LazyDbKvState} from a snapshot given a lookup id
* and timestamp.
* Initialize the {@link LazyDbKvState} from a snapshot.
*/
public LazyDbKvState(String kvStateId, Connection con, final DbBackendConfig conf, TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer, V defaultValue, long lookupId, long lookupTs) throws IOException {
TypeSerializer<V> valueSerializer, V defaultValue, long lookupTs) throws IOException {

this.kvStateId = kvStateId;

Expand All @@ -117,8 +111,7 @@ public LazyDbKvState(String kvStateId, Connection con, final DbBackendConfig con
this.con = con;
this.dbAdapter = conf.getDbAdapter();

this.lookupId = lookupId;
this.lookupTs = lookupTs;
this.lastTimestamp = lookupTs;

this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());

Expand Down Expand Up @@ -163,32 +156,31 @@ public V value() throws IOException {
@Override
public DbKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws IOException {

if (timestamp < lastTimestamp) {
// This would violate our timing assumptions and break key lookups
throw new RuntimeException("Checkpoint has lower timestamp than the last written timestamp.");
}

// We need to keep track of this to know when to execute the insert
int rowsInBatchStatement = 0;

// We insert the cached and modified entries to the database then clear
// the map of modified entries
for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
rowsInBatchStatement = batchInsert(
checkpointId,
timestamp,
state.getKey(),
state.getValue().orNull(),
rowsInBatchStatement = batchInsert(timestamp, state.getKey(), state.getValue().orNull(),
rowsInBatchStatement);
}
cache.modified.clear();

// We signal the end of the batch to flush the remaining inserts
if (rowsInBatchStatement > 0) {
batchInsert(0, 0, null, null, 0);
batchInsert(0, null, null, 0);
}

// Update the lookup id and timestamp so future cache loads will return
// the last written values
lookupId = checkpointId;
lookupTs = timestamp;
// Update the last timestamp
lastTimestamp = timestamp;

return new DbKvStateSnapshot<K, V>(kvStateId, checkpointId, timestamp);
return new DbKvStateSnapshot<K, V>(kvStateId, timestamp);
}

/**
Expand Down Expand Up @@ -235,19 +227,14 @@ public Void call() throws Exception {
*
* @return The current number of inserts in the batch statement
*/
private int batchInsert(final long checkpointId, final long timestamp, final K key, final V value,
final int batchCount) throws IOException {
private int batchInsert(final long timestamp, final K key, final V value, final int batchCount) throws IOException {

return retry(new Callable<Integer>() {
public Integer call() throws Exception {
if (key != null) {
// We set the insert statement parameters then add it to the
// current batch of inserts
dbAdapter.setKVCheckpointInsertParams(
kvStateId,
insertStatement,
checkpointId,
timestamp,
dbAdapter.setKVCheckpointInsertParams(kvStateId, insertStatement, timestamp,
InstantiationUtil.serializeToByteArray(keySerializer, key),
value != null ? InstantiationUtil.serializeToByteArray(valueSerializer, value) : null);
insertStatement.addBatch();
Expand Down Expand Up @@ -319,31 +306,33 @@ public Map<K, Optional<V>> getModified() {
}

/**
*
* Snapshot that stores a specific lookup checkpoint id and timestamp, and
* Snapshot that stores a specific checkpoint timestamp and state id, and
* also rolls back the database to that point upon restore. The rollback is
* done by removing all state checkpoints that have larger id than the
* lookup id and smaller timestamp than the recovery timestamp.
* done by removing all state checkpoints that have timestamps between the
* checkpoint and recovery timestamp.
*
*/
private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {

private static final long serialVersionUID = 1L;

private final String kvStateId;
private final long lookupId;
private final long lookupTs;
private final long checkpointTimestamp;

public DbKvStateSnapshot(String kvStateId, long lookupId, long lookupTs) {
this.lookupId = lookupId;
this.lookupTs = lookupTs;
public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp) {
this.checkpointTimestamp = checkpointTimestamp;
this.kvStateId = kvStateId;
}

@Override
public LazyDbKvState<K, V> restoreState(final DbStateBackend stateBackend,
final TypeSerializer<K> keySerializer, final TypeSerializer<V> valueSerializer,
final V defaultValue, ClassLoader classLoader, final long recoveryTimestamp) throws IOException {
final TypeSerializer<K> keySerializer, final TypeSerializer<V> valueSerializer, final V defaultValue,
ClassLoader classLoader, final long recoveryTimestamp) throws IOException {

// Validate our timing assumptions
if (recoveryTimestamp < checkpointTimestamp) {
throw new RuntimeException("Recovery timestamp must be greater than the last checkpoint timestamp");
}

// First we clean up the states written by partially failed
// snapshots (if any)
Expand All @@ -352,32 +341,19 @@ public LazyDbKvState<K, V> restoreState(final DbStateBackend stateBackend,
// subtasks) but this should not cause any problems as we
// are cleaning up based on the recovery timestamp.
public Void call() throws Exception {
stateBackend
.getConfiguration()
.getDbAdapter()
.cleanupFailedCheckpoints(
kvStateId,
stateBackend.getConnection(),
lookupId,
recoveryTimestamp);
stateBackend.getConfiguration().getDbAdapter().cleanupFailedCheckpoints(kvStateId,
stateBackend.getConnection(), checkpointTimestamp, recoveryTimestamp);

return null;
}
}, NUM_RETRIES);

// Restore the KvState
LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(
kvStateId,
stateBackend.getConnection(),
stateBackend.getConfiguration(),
keySerializer,
valueSerializer,
defaultValue,
lookupId,
lookupTs);
LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, stateBackend.getConnection(),
stateBackend.getConfiguration(), keySerializer, valueSerializer, defaultValue, checkpointTimestamp);

if (LOG.isDebugEnabled()) {
LOG.debug("KV state({},{}) restored.", lookupId, lookupTs);
LOG.debug("KV state({},{}) restored.", kvStateId, checkpointTimestamp);
}

return restored;
Expand Down Expand Up @@ -449,8 +425,8 @@ protected boolean removeEldestEntry(Entry<K, Optional<V>> eldest) {
* Fetch the current value from the database if exists or return null.
*
* @param key
* @return The value corresponding to the lookupId and lookupTs from the
* database if exists or null.
* @return The value corresponding to the key and the last timestamp
* from the database if exists or null.
*/
private V getFromDatabaseOrNull(final K key) {
try {
Expand All @@ -459,7 +435,7 @@ public V call() throws Exception {
// We lookup using the adapter and serialize/deserialize
// with the TypeSerializers
byte[] serializedVal = dbAdapter.lookupKey(kvStateId, selectStatement,
InstantiationUtil.serializeToByteArray(keySerializer, key), lookupId, lookupTs);
InstantiationUtil.serializeToByteArray(keySerializer, key), lastTimestamp);

return serializedVal != null
? InstantiationUtil.deserializeFromByteArray(valueSerializer, serializedVal) : null;
Expand Down Expand Up @@ -494,15 +470,11 @@ private void evictIfFull() {

// We only need to write to the database if modified
if (modified.remove(next.getKey()) != null) {
// We insert elements with the last checkpoint id
// but an incremented timestamp to ensure good
// lookups. This won't interfere with the actual
// checkpoint timestamps.
rowsInBatch = batchInsert(
lookupId,
lookupTs + 1,
next.getKey(),
next.getValue().orNull(),
// We insert elements with timestamp + 1 to ensure
// good lookups. This won't interfere with the
// actual checkpoint timestamps as the next one
// should be much higher.
rowsInBatch = batchInsert(lastTimestamp + 1, next.getKey(), next.getValue().orNull(),
numEvicted);
if (rowsInBatch == 0) {
writtenToDb = true;
Expand All @@ -513,15 +485,14 @@ private void evictIfFull() {
}
// Flush batch if has rows
if (rowsInBatch > 0) {
batchInsert(0, 0, null, null, 0);
batchInsert(0, null, null, 0);
writtenToDb = true;
}

// If we have written new values to the database we need to
// increment our lookup timestamp so we will read them back
// next time they are requested
// set the lastTimestamp accordingly
if (writtenToDb) {
lookupTs = lookupTs + 1;
lastTimestamp = lastTimestamp + 1;
}

} catch (IOException e) {
Expand Down
Loading

0 comments on commit 98dee30

Please sign in to comment.