Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Execute compactions in background thread + k…
Browse files Browse the repository at this point in the history
…eep connections alive on empty snapshots
  • Loading branch information
gyfora committed Nov 22, 2015
1 parent dc1615a commit 92644f7
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,11 @@ void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedS
*/
void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException;

/**
* Execute a simple operation to refresh the current database connection in
* case no data is written for a longer time period. Usually something like
* "select 1"
*/
void keepAlive(Connection con) throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -75,6 +77,8 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
private final int maxInsertBatchSize;
// We will do database compaction every so many checkpoints
private final int compactEvery;
// Executor for automatic compactions
private ExecutorService executor = null;

// Database properties
private final DbBackendConfig conf;
Expand All @@ -96,7 +100,7 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
private long nextTs;
private Map<Long, Long> completedCheckpoints = new HashMap<>();

private long lastCompactedTs;
private volatile long lastCompactedTs;

// ------------------------------------------------------

Expand All @@ -119,6 +123,10 @@ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons,

this.kvStateId = kvStateId;
this.compact = compact;
if (compact) {
// Compactions will run in a seperate thread
executor = Executors.newSingleThreadExecutor();
}

this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
Expand Down Expand Up @@ -186,13 +194,28 @@ public DbKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throw
+ "this should not happen.");
}

// We insert the modified elements to the database with the current
// timestamp then clear the modified states
for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
batchInsert.add(state, timestamp);
// If there are any modified states we perform the inserts
if (!cache.modified.isEmpty()) {
// We insert the modified elements to the database with the current
// timestamp then clear the modified states
for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
batchInsert.add(state, timestamp);
}
batchInsert.flush(timestamp);
cache.modified.clear();
} else if (compact) {
// Otherwise we call the keep alive method to avoid dropped
// connections (only call this on the compactor instance)
for (final Connection c : connections.connections()) {
SQLRetrier.retry(new Callable<Void>() {
@Override
public Void call() throws Exception {
dbAdapter.keepAlive(c);
return null;
}
}, numSqlRetries, sqlRetrySleep);
}
}
batchInsert.flush(timestamp);
cache.modified.clear();

nextTs = timestamp + 1;
completedCheckpoints.put(checkpointId, timestamp);
Expand Down Expand Up @@ -240,23 +263,14 @@ public Void call() throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) {
Long ts = completedCheckpoints.remove(checkpointId);
final Long ts = completedCheckpoints.remove(checkpointId);
if (ts == null) {
LOG.warn("Complete notification for missing checkpoint: " + checkpointId);
ts = 0L;
}
// If compaction is turned on we compact on the first subtask
if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
try {
for (Connection c : connections.connections()) {
dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts);
}
lastCompactedTs = ts;
if (LOG.isDebugEnabled()) {
LOG.debug("State succesfully compacted for {}.", kvStateId);
}
} catch (SQLException e) {
LOG.warn("State compaction failed due: {}", e);
} else {
// If compaction is turned on we compact on the compactor subtask
// asynchronously in the background
if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
executor.execute(new Compactor(ts));
}
}
}
Expand All @@ -275,6 +289,10 @@ public void dispose() {
} catch (SQLException e) {
// There is not much to do about this
}

if (executor != null) {
executor.shutdown();
}
}

/**
Expand All @@ -294,15 +312,25 @@ public Map<K, Optional<V>> getModified() {
return cache.modified;
}

public boolean isCompacter() {
/**
* Used for testing purposes
*/
public boolean isCompactor() {
return compact;
}

/**
* Snapshot that stores a specific checkpoint timestamp and state id, and also
* rolls back the database to that point upon restore. The rollback is done
* by removing all state checkpoints that have timestamps between the checkpoint
* and recovery timestamp.
* Used for testing purposes
*/
public ExecutorService getExecutor() {
return executor;
}

/**
* Snapshot that stores a specific checkpoint timestamp and state id, and
* also rolls back the database to that point upon restore. The rollback is
* done by removing all state checkpoints that have timestamps between the
* checkpoint and recovery timestamp.
*
*/
private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {
Expand Down Expand Up @@ -557,4 +585,40 @@ public void flush(long timestamp) throws IOException {

}
}

private class Compactor implements Runnable {

private long upperBound;

public Compactor(long upperBound) {
this.upperBound = upperBound;
}

@Override
public void run() {
// We create new database connections to make sure we don't
// interfere with the checkpointing (connections are not thread
// safe)
try (ShardedConnection sc = conf.createShardedConnection()) {
for (final Connection c : sc.connections()) {
SQLRetrier.retry(new Callable<Void>() {
@Override
public Void call() throws Exception {
dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound);
return null;
}
}, numSqlRetries, sqlRetrySleep);
}
if (LOG.isInfoEnabled()) {
LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId,
lastCompactedTs,
upperBound);
}
lastCompactedTs = upperBound;
} catch (SQLException | IOException e) {
LOG.warn("State compaction failed due: {}", e);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,12 @@ private void setKvInsertParams(String stateId, PreparedStatement insertStatement
insertStatement.setNull(4, Types.BLOB);
}
}

@Override
public void keepAlive(Connection con) throws SQLException {
try(Statement smt = con.createStatement()) {
smt.executeQuery("SELECT 1");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.derby.drda.NetworkServerControl;
Expand All @@ -51,7 +52,6 @@
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -105,10 +105,12 @@ public void testSetupAndSerialization() throws Exception {
backend.initializeForJob(env);

assertNotNull(backend.getConnections());
assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
assertTrue(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));

backend.disposeAllStateForCurrentJob();
assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
assertFalse(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
backend.close();

assertTrue(backend.getConnections().getFirst().isClosed());
Expand Down Expand Up @@ -195,19 +197,9 @@ public void testKeyValueState() throws Exception {
kv.setCurrentKey(3);
kv.update("u3");

assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));

kv.notifyCheckpointComplete(682375462378L);

// draw another snapshot
KvStateSnapshot<Integer, String, DbStateBackend> snapshot2 = kv.snapshot(682375462379L,
200);
assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));
assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200));
kv.notifyCheckpointComplete(682375462379L);
// Compaction should be performed
assertFalse(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));
assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200));

// validate the original state
assertEquals(3, kv.size());
Expand Down Expand Up @@ -237,9 +229,11 @@ public void testKeyValueState() throws Exception {
}

@Test
public void testCleanupTasks() throws Exception {
public void testCompaction() throws Exception {
DbBackendConfig conf = new DbBackendConfig("flink", "flink", url1);
conf.setDbAdapter(new DerbyAdapter());
MockAdapter adapter = new MockAdapter();
conf.setKvStateCompactionFrequency(2);
conf.setDbAdapter(adapter);

DbStateBackend backend1 = new DbStateBackend(conf);
DbStateBackend backend2 = new DbStateBackend(conf);
Expand All @@ -249,9 +243,40 @@ public void testCleanupTasks() throws Exception {
backend2.initializeForJob(new DummyEnvironment("test", 3, 1));
backend3.initializeForJob(new DummyEnvironment("test", 3, 2));

assertTrue(backend1.createKvState("a_1", "a", null, null, null).isCompacter());
assertFalse(backend2.createKvState("a_1", "a", null, null, null).isCompacter());
assertFalse(backend3.createKvState("a_1", "a", null, null, null).isCompacter());
LazyDbKvState<?, ?> s1 = backend1.createKvState("a_1", "a", null, null, null);
LazyDbKvState<?, ?> s2 = backend2.createKvState("a_1", "a", null, null, null);
LazyDbKvState<?, ?> s3 = backend3.createKvState("a_1", "a", null, null, null);

assertTrue(s1.isCompactor());
assertFalse(s2.isCompactor());
assertFalse(s3.isCompactor());
assertNotNull(s1.getExecutor());
assertNull(s2.getExecutor());
assertNull(s3.getExecutor());

s1.snapshot(1, 100);
s1.notifyCheckpointComplete(1);
s1.snapshot(2, 200);
s1.snapshot(3, 300);
s1.notifyCheckpointComplete(2);
s1.notifyCheckpointComplete(3);
s1.snapshot(4, 400);
s1.snapshot(5, 500);
s1.notifyCheckpointComplete(4);
s1.notifyCheckpointComplete(5);

s1.dispose();
s2.dispose();
s3.dispose();

// Wait until the compaction completes
s1.getExecutor().awaitTermination(5, TimeUnit.SECONDS);
assertEquals(2, adapter.numCompcations.get());
assertEquals(5, adapter.keptAlive);

backend1.close();
backend2.close();
backend3.close();
}

@Test
Expand Down Expand Up @@ -422,16 +447,6 @@ private static boolean isTableEmpty(Connection con, String tableName) throws SQL
}
}

private static boolean containsKey(Connection con, String tableName, int key, long ts)
throws SQLException, IOException {
try (PreparedStatement smt = con
.prepareStatement("select * from " + tableName + " where k=? and timestamp=?")) {
smt.setBytes(1, InstantiationUtil.serializeToByteArray(IntSerializer.INSTANCE, key));
smt.setLong(2, ts);
return smt.executeQuery().next();
}
}

private static String localFileUri(File path) {
return path.toURI().toString();
}
Expand All @@ -443,4 +458,21 @@ private static void deleteDirectorySilently(File dir) {
}
}

private static class MockAdapter extends DerbyAdapter {

private static final long serialVersionUID = 1L;
public AtomicInteger numCompcations = new AtomicInteger(0);
public int keptAlive = 0;

@Override
public void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException {
numCompcations.incrementAndGet();
}

@Override
public void keepAlive(Connection con) throws SQLException {
keptAlive++;
}
}

}

0 comments on commit 92644f7

Please sign in to comment.