Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Execute compactions in background thread + k…
Browse files Browse the repository at this point in the history
…eep connections alive on empty snapshots

Closes apache#1305
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent cd8be0b commit db2a964
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,11 @@ void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedS
*/
void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException;

/**
* Execute a simple operation to refresh the current database connection in
* case no data is written for a longer time period. Usually something like
* "select 1"
*/
void keepAlive(Connection con) throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -327,80 +327,4 @@ public ShardedConnection createShardedConnection() throws SQLException {
return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner);
}
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof DbBackendConfig)) {
return false;
}
DbBackendConfig other = (DbBackendConfig) obj;
if (JDBCDriver == null) {
if (other.JDBCDriver != null) {
return false;
}
} else if (!JDBCDriver.equals(other.JDBCDriver)) {
return false;
}
if (dbAdapter == null) {
if (other.dbAdapter != null) {
return false;
}
} else if (!dbAdapter.getClass().equals(other.dbAdapter.getClass())) {
return false;
}
if (kvStateCacheSize != other.kvStateCacheSize) {
return false;
}
if (kvStateCompactionFreq != other.kvStateCompactionFreq) {
return false;
}
if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction)) {
return false;
}
if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) {
return false;
}
if (maxNumberOfSqlRetries != other.maxNumberOfSqlRetries) {
return false;
}
if (shardPartitioner == null) {
if (other.shardPartitioner != null) {
return false;
}
} else if (!shardPartitioner.getClass().equals(other.shardPartitioner.getClass())) {
return false;
}
if (shardUrls == null) {
if (other.shardUrls != null) {
return false;
}
} else if (!shardUrls.equals(other.shardUrls)) {
return false;
}
if (sleepBetweenSqlRetries != other.sleepBetweenSqlRetries) {
return false;
}
if (userName == null) {
if (other.userName != null) {
return false;
}
} else if (!userName.equals(other.userName)) {
return false;
}
if (userPassword == null) {
if (other.userPassword != null) {
return false;
}
} else if (!userPassword.equals(other.userPassword)) {
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp
}

@Override
public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
stateName + "_" + operatorId + "_" + env.getJobID().toShortString(),
stateId + "_" + env.getJobID().toShortString(),
env.getIndexInSubtaskGroup() == 0,
getConnections(),
getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.eclipse.jetty.util.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* State handle implementation for storing checkpoints as byte arrays in
Expand All @@ -35,6 +36,7 @@
public class DbStateHandle<S> implements Serializable, StateHandle<S> {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);

private final String jobId;
private final DbBackendConfig dbConfig;
Expand Down Expand Up @@ -75,8 +77,8 @@ public Boolean call() throws Exception {
}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
} catch (IOException e) {
// We don't want to fail the job here, but log the error.
if (Log.isDebugEnabled()) {
Log.debug("Could not discard state.");
if (LOG.isDebugEnabled()) {
LOG.debug("Could not discard state.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -75,6 +77,8 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
private final int maxInsertBatchSize;
// We will do database compaction every so many checkpoints
private final int compactEvery;
// Executor for automatic compactions
private ExecutorService executor = null;

// Database properties
private final DbBackendConfig conf;
Expand All @@ -96,7 +100,7 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
private long nextTs;
private Map<Long, Long> completedCheckpoints = new HashMap<>();

private long lastCompactedTs;
private volatile long lastCompactedTs;

// ------------------------------------------------------

Expand All @@ -119,6 +123,10 @@ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons,

this.kvStateId = kvStateId;
this.compact = compact;
if (compact) {
// Compactions will run in a seperate thread
executor = Executors.newSingleThreadExecutor();
}

this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
Expand Down Expand Up @@ -186,13 +194,28 @@ public DbKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throw
+ "this should not happen.");
}

// We insert the modified elements to the database with the current
// timestamp then clear the modified states
for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
batchInsert.add(state, timestamp);
// If there are any modified states we perform the inserts
if (!cache.modified.isEmpty()) {
// We insert the modified elements to the database with the current
// timestamp then clear the modified states
for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
batchInsert.add(state, timestamp);
}
batchInsert.flush(timestamp);
cache.modified.clear();
} else if (compact) {
// Otherwise we call the keep alive method to avoid dropped
// connections (only call this on the compactor instance)
for (final Connection c : connections.connections()) {
SQLRetrier.retry(new Callable<Void>() {
@Override
public Void call() throws Exception {
dbAdapter.keepAlive(c);
return null;
}
}, numSqlRetries, sqlRetrySleep);
}
}
batchInsert.flush(timestamp);
cache.modified.clear();

nextTs = timestamp + 1;
completedCheckpoints.put(checkpointId, timestamp);
Expand Down Expand Up @@ -240,23 +263,14 @@ public Void call() throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) {
Long ts = completedCheckpoints.remove(checkpointId);
final Long ts = completedCheckpoints.remove(checkpointId);
if (ts == null) {
LOG.warn("Complete notification for missing checkpoint: " + checkpointId);
ts = 0L;
}
// If compaction is turned on we compact on the first subtask
if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
try {
for (Connection c : connections.connections()) {
dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts);
}
lastCompactedTs = ts;
if (LOG.isDebugEnabled()) {
LOG.debug("State succesfully compacted for {}.", kvStateId);
}
} catch (SQLException e) {
LOG.warn("State compaction failed due: {}", e);
} else {
// If compaction is turned on we compact on the compactor subtask
// asynchronously in the background
if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
executor.execute(new Compactor(ts));
}
}
}
Expand All @@ -275,6 +289,10 @@ public void dispose() {
} catch (SQLException e) {
// There is not much to do about this
}

if (executor != null) {
executor.shutdown();
}
}

/**
Expand All @@ -294,15 +312,25 @@ public Map<K, Optional<V>> getModified() {
return cache.modified;
}

public boolean isCompacter() {
/**
* Used for testing purposes
*/
public boolean isCompactor() {
return compact;
}

/**
* Snapshot that stores a specific checkpoint timestamp and state id, and also
* rolls back the database to that point upon restore. The rollback is done
* by removing all state checkpoints that have timestamps between the checkpoint
* and recovery timestamp.
* Used for testing purposes
*/
public ExecutorService getExecutor() {
return executor;
}

/**
* Snapshot that stores a specific checkpoint timestamp and state id, and
* also rolls back the database to that point upon restore. The rollback is
* done by removing all state checkpoints that have timestamps between the
* checkpoint and recovery timestamp.
*
*/
private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {
Expand Down Expand Up @@ -557,4 +585,40 @@ public void flush(long timestamp) throws IOException {

}
}

private class Compactor implements Runnable {

private long upperBound;

public Compactor(long upperBound) {
this.upperBound = upperBound;
}

@Override
public void run() {
// We create new database connections to make sure we don't
// interfere with the checkpointing (connections are not thread
// safe)
try (ShardedConnection sc = conf.createShardedConnection()) {
for (final Connection c : sc.connections()) {
SQLRetrier.retry(new Callable<Void>() {
@Override
public Void call() throws Exception {
dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound);
return null;
}
}, numSqlRetries, sqlRetrySleep);
}
if (LOG.isInfoEnabled()) {
LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId,
lastCompactedTs,
upperBound);
}
lastCompactedTs = upperBound;
} catch (SQLException | IOException e) {
LOG.warn("State compaction failed due: {}", e);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,12 @@ private void setKvInsertParams(String stateId, PreparedStatement insertStatement
insertStatement.setNull(4, Types.BLOB);
}
}

@Override
public void keepAlive(Connection con) throws SQLException {
try(Statement smt = con.createStatement()) {
smt.executeQuery("SELECT 1");
}
}

}
Loading

0 comments on commit db2a964

Please sign in to comment.