Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Improve compacting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent c254bda commit 43b8e57
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
private long nextTs;
private Map<Long, Long> completedCheckpoints = new HashMap<>();

private long lastCompactedTs;

// ------------------------------------------------------

/**
Expand All @@ -104,14 +106,15 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
*/
public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, DbBackendConfig conf,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
this(kvStateId, compact, cons, conf, keySerializer, valueSerializer, defaultValue, 1);
this(kvStateId, compact, cons, conf, keySerializer, valueSerializer, defaultValue, 1, 0);
}

/**
* Initialize the {@link LazyDbKvState} from a snapshot.
*/
public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, final DbBackendConfig conf,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, long nextTs)
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, long nextTs,
long lastCompactedTs)
throws IOException {

this.kvStateId = kvStateId;
Expand All @@ -130,6 +133,7 @@ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons,
this.sqlRetrySleep = conf.getSleepBetweenSqlRetries();

this.nextTs = nextTs;
this.lastCompactedTs = lastCompactedTs;

this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());

Expand Down Expand Up @@ -192,7 +196,7 @@ public DbKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throw

nextTs = timestamp + 1;
completedCheckpoints.put(checkpointId, timestamp);
return new DbKvStateSnapshot<K, V>(kvStateId, timestamp);
return new DbKvStateSnapshot<K, V>(kvStateId, timestamp, lastCompactedTs);
}

/**
Expand Down Expand Up @@ -245,8 +249,9 @@ public void notifyCheckpointComplete(long checkpointId) {
if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
try {
for (Connection c : connections.connections()) {
dbAdapter.compactKvStates(kvStateId, c, 0, ts);
dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts);
}
lastCompactedTs = ts;
if (LOG.isDebugEnabled()) {
LOG.debug("State succesfully compacted for {}.", kvStateId);
}
Expand Down Expand Up @@ -294,10 +299,10 @@ public boolean isCompacter() {
}

/**
* Snapshot that stores a specific checkpoint id and state id, and also
* Snapshot that stores a specific checkpoint timestamp and state id, and also
* rolls back the database to that point upon restore. The rollback is done
* by removing all state checkpoints that have ids between the checkpoint
* and recovery id.
* by removing all state checkpoints that have timestamps between the checkpoint
* and recovery timestamp.
*
*/
private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {
Expand All @@ -306,10 +311,12 @@ private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, Db

private final String kvStateId;
private final long checkpointTimestamp;
private final long lastCompactedTimestamp;

public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp) {
public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp, long lastCompactedTs) {
this.checkpointTimestamp = checkpointTimestamp;
this.kvStateId = kvStateId;
this.lastCompactedTimestamp = lastCompactedTs;
}

@Override
Expand Down Expand Up @@ -346,7 +353,7 @@ public Void call() throws Exception {
// Restore the KvState
LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, cleanup,
stateBackend.getConnections(), stateBackend.getConfiguration(), keySerializer, valueSerializer,
defaultValue, recoveryTimestamp);
defaultValue, recoveryTimestamp, lastCompactedTimestamp);

if (LOG.isDebugEnabled()) {
LOG.debug("KV state({},{}) restored.", kvStateId, recoveryTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,15 @@ public String prepareKeyLookup(String stateId) throws SQLException {
return "SELECT v"
+ " FROM kvstate_" + stateId
+ " WHERE k = ?"
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC LIMIT 1";
}

@Override
public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupTs)
throws SQLException {
lookupStatement.setBytes(1, key);
lookupStatement.setLong(2, lookupTs);

ResultSet res = lookupStatement.executeQuery();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public String prepareKeyLookup(String stateId) throws SQLException {
validateStateId(stateId);
return "SELECT v " + "FROM kvstate_" + stateId
+ " WHERE k = ? "
+ "ORDER BY timestamp DESC";
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC";
}

@Override
Expand Down

0 comments on commit 43b8e57

Please sign in to comment.