diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
index 88b839d6c65671..22b11b2a455003 100644
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -60,22 +60,22 @@ under the License.
test-jar
test
-
- com.google.guava
- guava
- ${guava.version}
-
+
+ com.google.guava
+ guava
+ ${guava.version}
+
org.apache.derby
derbyclient
10.12.1.1
- test
+ test
org.apache.derby
derbynet
10.12.1.1
- test
+ test
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index fd0290991e84c8..2a860aeb388a23 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -28,7 +28,8 @@
/**
*
* Adapter for bridging inconsistencies between the different SQL
- * implementations.
+ * implementations. The default implementation has been tested to work well with
+ * MySQL
*
*/
public class DbAdapter implements Serializable {
@@ -39,6 +40,11 @@ public class DbAdapter implements Serializable {
// Non-partitioned state checkpointing
// -----------------------------------------------------------------------------
+ /**
+ * Initialize tables for storing non-partitioned checkpoints for the given
+ * job id and database connection.
+ *
+ */
public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
@@ -47,32 +53,75 @@ public void createCheckpointsTable(String jobId, Connection con) throws SQLExcep
+ "checkpointId bigint, "
+ "timestamp bigint, "
+ "handleId bigint,"
- + "serializedData blob,"
+ + "checkpoint blob,"
+ "PRIMARY KEY (handleId)"
+ ")");
}
}
+ /**
+ * Checkpoints will be inserted in the database using prepared statements.
+ * This methods should prepare and return the statement that will be used
+ * later to insert using the given connection.
+ *
+ */
public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException {
return con.prepareStatement(
"INSERT INTO checkpoints_" + jobId
- + " (checkpointId, timestamp, handleId, serializedData) VALUES (?,?,?,?)");
+ + " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
}
+ /**
+ * Set the {@link PreparedStatement} parameters for the statement returned
+ * by {@link #prepareCheckpointInsert(String, Connection)}.
+ *
+ * @param jobId
+ * Id of the current job.
+ * @param insertStatement
+ * Statement returned by
+ * {@link #prepareCheckpointInsert(String, Connection)}.
+ * @param checkpointId
+ * Global checkpoint id.
+ * @param timestamp
+ * Global checkpoint timestamp.
+ * @param handleId
+ * Unique id assigned to this state checkpoint (should be primary
+ * key).
+ * @param checkpoint
+ * The serialized checkpoint.
+ * @throws SQLException
+ */
public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
- long timestamp,
- long handleId, byte[] checkpoint) throws SQLException {
+ long timestamp, long handleId, byte[] checkpoint) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
insertStatement.setLong(3, handleId);
insertStatement.setBytes(4, checkpoint);
}
- public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long handleId) throws SQLException {
+ /**
+ * Retrieve the serialized checkpoint data from the database.
+ *
+ * @param jobId
+ * Id of the current job.
+ * @param con
+ * Database connection
+ * @param checkpointId
+ * Global checkpoint id.
+ * @param checkpointTs
+ * Global checkpoint timestamp.
+ * @param handleId
+ * Unique id assigned to this state checkpoint (should be primary
+ * key).
+ * @return The byte[] corresponding to the checkpoint or null if missing.
+ * @throws SQLException
+ */
+ public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+ throws SQLException {
try (Statement smt = con.createStatement()) {
ResultSet rs = smt.executeQuery(
- "SELECT serializedData FROM checkpoints_" + jobId
+ "SELECT checkpoint FROM checkpoints_" + jobId
+ " WHERE handleId = " + handleId);
if (rs.next()) {
return rs.getBytes(1);
@@ -82,7 +131,25 @@ public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, lon
}
}
- public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long handleId) throws SQLException {
+ /**
+ * Remove the given checkpoint from the database.
+ *
+ * @param jobId
+ * Id of the current job.
+ * @param con
+ * Database connection
+ * @param checkpointId
+ * Global checkpoint id.
+ * @param checkpointTs
+ * Global checkpoint timestamp.
+ * @param handleId
+ * Unique id assigned to this state checkpoint (should be primary
+ * key).
+ * @return The byte[] corresponding to the checkpoint or null if missing.
+ * @throws SQLException
+ */
+ public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+ throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"DELETE FROM checkpoints_" + jobId
@@ -90,6 +157,12 @@ public void deleteCheckpoint(String jobId, Connection con, long checkpointId, lo
}
}
+ /**
+ * Remove all states for the given JobId, by for instance dropping the
+ * entire table.
+ *
+ * @throws SQLException
+ */
public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
@@ -101,6 +174,11 @@ public void disposeAllStateForJob(String jobId, Connection con) throws SQLExcept
// Partitioned state checkpointing
// -----------------------------------------------------------------------------
+ /**
+ * Initialize the necessary tables for the given stateId. The state id
+ * consist of the JobId+OperatorId+StateName.
+ *
+ */
public void createKVStateTable(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
@@ -117,6 +195,50 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
}
}
+ /**
+ * Prepare the the statement that will be used to insert key-value pairs in
+ * the database.
+ *
+ */
+ public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
+ validateStateId(stateId);
+ return con.prepareStatement(
+ "INSERT INTO kvstate_" + stateId + " (checkpointId, timestamp, k, v) VALUES (?,?,?,?)");
+ }
+
+ /**
+ * Insert new key-value pair in the database. The method should be able to
+ * receive null values which mark the removal of a key (tombstone).
+ *
+ * @param stateId
+ * Unique identifier of the kvstate (usually the table name).
+ * @param insertStatement
+ * Statement prepared in
+ * {@link #prepareKVCheckpointInsert(String, Connection)}
+ * @param checkpointId
+ * @param timestamp
+ * @param key
+ * @param value
+ * @throws SQLException
+ */
+ public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
+ long timestamp,
+ byte[] key, byte[] value) throws SQLException {
+ insertStatement.setLong(1, checkpointId);
+ insertStatement.setLong(2, timestamp);
+ insertStatement.setBytes(3, key);
+ if (value != null) {
+ insertStatement.setBytes(4, value);
+ } else {
+ insertStatement.setNull(4, Types.BLOB);
+ }
+ }
+
+ /**
+ * Prepare the statement that will be used to lookup keys from the database.
+ * Keys and values are assumed to be byte arrays.
+ *
+ */
public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
return con.prepareStatement("SELECT v"
@@ -127,6 +249,24 @@ public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws
+ " ORDER BY checkpointId DESC LIMIT 1");
}
+ /**
+ * Retrieve the latest value from the database for a given key that has
+ * checkpointId <= lookupId and checkpointTs <= lookupTs.
+ *
+ * @param stateId
+ * Unique identifier of the kvstate (usually the table name).
+ * @param lookupStatement
+ * The statement returned by
+ * {@link #prepareKeyLookup(String, Connection)}.
+ * @param key
+ * The key to lookup.
+ * @param lookupId
+ * Latest checkpoint id to select.
+ * @param lookupTs
+ * Latest checkpoint ts to select.
+ * @return The latest valid value for the key.
+ * @throws SQLException
+ */
public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId,
long lookupTs) throws SQLException {
lookupStatement.setBytes(1, key);
@@ -142,36 +282,27 @@ public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[
}
}
- public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
- validateStateId(stateId);
- return con.prepareStatement(
- "INSERT INTO kvstate_" + stateId + " (checkpointId, timestamp, k, v) VALUES (?,?,?,?)");
- }
-
- public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
- long timestamp,
- byte[] key, byte[] value) throws SQLException {
- insertStatement.setLong(1, checkpointId);
- insertStatement.setLong(2, timestamp);
- insertStatement.setBytes(3, key);
- if (value != null) {
- insertStatement.setBytes(4, value);
- } else {
- insertStatement.setNull(4, Types.BLOB);
- }
- }
-
- public void cleanupFailedCheckpoints(String stateId, Connection con, long lookupId, long lookupTs)
+ /**
+ * Remove partially failed snapshots using the latest id and a global
+ * recovery timestamp. All records with a higher id but lower timestamp
+ * should be deleted from the database.
+ *
+ */
+ public void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId, long reoveryTs)
throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE FROM kvstate_" + stateId
- + " WHERE checkpointId > " + lookupId
- + " AND timestamp < " + lookupTs);
+ + " WHERE checkpointId > " + checkpointId
+ + " AND timestamp < " + reoveryTs);
}
}
- public static void validateStateId(String name) {
+ /**
+ * Tries to avoid SQL injection with weird state names.
+ *
+ */
+ protected static void validateStateId(String name) {
if (!name.matches("[a-zA-Z0-9_]+")) {
throw new RuntimeException("State name contains invalid characters.");
}
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
index 8d73c5d7210f3b..fbccc863d8c1f9 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
@@ -35,19 +35,18 @@ public class DbBackendConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ // Database connection properties
private final String userName;
private final String userPassword;
-
private final List shardUrls;
- private String JDBCDriver = null;
-
+ // JDBC Driver + DbAdapter information
private Class extends DbAdapter> dbAdapterClass = DbAdapter.class;
+ private String JDBCDriver = null;
+ // KvState properties
private int kvStateCacheSize = 10000;
-
private int maxKvInsertBatchSize = 1000;
-
private float maxKvEvictFraction = 0.1f;
/**
@@ -87,26 +86,48 @@ public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) {
this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
}
+ /**
+ * The username used to connect to the database at the given urls.
+ */
public String getUserName() {
return userName;
}
+ /**
+ * The password used to connect to the database at the given url and
+ * username.
+ */
public String getUserPassword() {
return userPassword;
}
+ /**
+ * Number of database shards defined.
+ */
public int getNumberOfShards() {
return shardUrls.size();
}
+ /**
+ * Database shard urls as provided in the constructor.
+ *
+ */
public List getShardUrls() {
return shardUrls;
}
+ /**
+ * The url of the first shard.
+ *
+ */
public String getUrl() {
return getShardUrl(0);
}
+ /**
+ * The url of a specific shard.
+ *
+ */
public String getShardUrl(int shardIndex) {
validateShardIndex(shardIndex);
return shardUrls.get(shardIndex);
@@ -284,6 +305,7 @@ public int hashCode() {
result = prime * result + ((JDBCDriver == null) ? 0 : JDBCDriver.hashCode());
result = prime * result + ((dbAdapterClass == null) ? 0 : dbAdapterClass.hashCode());
result = prime * result + kvStateCacheSize;
+ result = prime * result + Float.floatToIntBits(maxKvEvictFraction);
result = prime * result + maxKvInsertBatchSize;
result = prime * result + ((shardUrls == null) ? 0 : shardUrls.hashCode());
result = prime * result + ((userName == null) ? 0 : userName.hashCode());
@@ -320,6 +342,9 @@ public boolean equals(Object obj) {
if (kvStateCacheSize != other.kvStateCacheSize) {
return false;
}
+ if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction)) {
+ return false;
+ }
if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) {
return false;
}
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 33005a78941660..cb3d41a48b7f63 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -58,25 +58,24 @@
*/
public class DbStateBackend extends StateBackend {
+ private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
- private static final long serialVersionUID = 1L;
+ private Random rnd;
// ------------------------------------------------------
private Environment env;
- private Random rnd;
// ------------------------------------------------------
private final DbBackendConfig dbConfig;
private final DbAdapter dbAdapter;
- private static final int NUM_RETRIES = 5;
-
+ private Connection con;
private int shardIndex = 0;
- private Connection con;
+ private static final int NUM_RETRIES = 5;
private PreparedStatement insertStatement;
// ------------------------------------------------------
@@ -133,6 +132,8 @@ public DbBackendConfig getConfiguration() {
public StateHandle checkpointStateSerializable(final S state, final long checkpointID,
final long timestamp) throws Exception {
+ // If we set a different backend for non-partitioned checkpoints we use
+ // that otherwise write to the database.
if (nonPartitionedStateBackend == null) {
return retry(new Callable>() {
public DbStateHandle call() throws Exception {
@@ -146,7 +147,7 @@ public DbStateHandle call() throws Exception {
insertStatement.executeUpdate();
- return new DbStateHandle(env.getJobID().toString(), checkpointID, handleId,
+ return new DbStateHandle(env.getJobID().toString(), checkpointID, timestamp, handleId,
dbConfig.createConfigForShard(shardIndex));
}
}, NUM_RETRIES);
@@ -159,8 +160,8 @@ public DbStateHandle call() throws Exception {
public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp)
throws Exception {
if (nonPartitionedStateBackend == null) {
- // We don't implement this functionality as we cannot directly write
- // a stream to the database anyways.
+ // We don't implement this functionality for the DbStateBackend as
+ // we cannot directly write a stream to the database anyways.
throw new UnsupportedOperationException("Use ceckpointStateSerializable instead.");
} else {
return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
@@ -170,9 +171,13 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp
@Override
public LazyDbKvState createKvState(int operatorId, String stateName,
TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws IOException {
- return new LazyDbKvState(env.getJobID() + "_" + operatorId + "_" + stateName,
- getConnection(), getConfiguration(), keySerializer,
- valueSerializer, defaultValue);
+ return new LazyDbKvState(
+ env.getJobID() + "_" + operatorId + "_" + stateName,
+ getConnection(),
+ getConfiguration(),
+ keySerializer,
+ valueSerializer,
+ defaultValue);
}
@Override
@@ -187,7 +192,8 @@ public void initializeForJob(final Environment env) throws Exception {
con = dbConfig.createConnection(shardIndex);
// We want the most light-weight transaction isolation level as we don't
// have conflicting reads/writes. We just want to be able to roll back
- // batch inserts for kv snapshots.
+ // batch inserts for k-v snapshots. This requirement might be removed in
+ // the future.
con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
// If we have a different backend for non-partitioned states we
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index 3865822297056d..c6eaaf4b1aefad 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -30,36 +30,38 @@
/**
* State handle implementation for storing checkpoints as byte arrays in
- * databases.
+ * databases using the {@link DbAdapter} defined in the {@link DbBackendConfig}.
*
*/
public class DbStateHandle implements Serializable, StateHandle {
private static final long serialVersionUID = 1L;
- private final long checkpointId;
- private final long handleId;
+ private static final int NUM_RETRIES = 5;
private final String jobId;
-
private final DbBackendConfig dbConfig;
- private DbAdapter adapter;
- public DbStateHandle(String jobId, long checkpointId, long handleId, DbBackendConfig dbConfig) {
+ private final long checkpointId;
+ private final long checkpointTs;
+
+ private final long handleId;
+
+ public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
this.checkpointId = checkpointId;
this.handleId = handleId;
this.jobId = jobId;
this.dbConfig = dbConfig;
- this.adapter = dbConfig.getDbAdapter();
+ this.checkpointTs = checkpointTs;
}
protected byte[] getBytes() throws IOException {
return retry(new Callable() {
public byte[] call() throws Exception {
try (Connection con = dbConfig.createConnection()) {
- return adapter.getCheckpoint(jobId, con, checkpointId, handleId);
+ return dbConfig.getDbAdapter().getCheckpoint(jobId, con, checkpointId, checkpointTs, handleId);
}
}
- }, 5);
+ }, NUM_RETRIES);
}
@Override
@@ -68,13 +70,16 @@ public void discardState() {
retry(new Callable() {
public Boolean call() throws Exception {
try (Connection con = dbConfig.createConnection()) {
- adapter.deleteCheckpoint(jobId, con, checkpointId, handleId);
+ dbConfig.getDbAdapter().deleteCheckpoint(jobId, con, checkpointId, checkpointTs, handleId);
}
return true;
}
- }, 2);
+ }, NUM_RETRIES);
} catch (IOException e) {
- Log.warn("Could not discard state.");
+ // We don't want to fail the job here, but log the error.
+ if (Log.isDebugEnabled()) {
+ Log.debug("Could not discard state.");
+ }
}
}
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
index d370a5d67eea78..de12bb3e456e65 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.util.InstantiationUtil;
+import org.eclipse.jetty.util.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +70,11 @@ public class LazyDbKvState implements KvState {
// Max number of key-value pairs inserted in one batch to the database
private final int maxInsertBatchSize;
+ // Database properties
private final Connection con;
private final DbAdapter dbAdapter;
+ // Statements for key-lookups and inserts as prepared by the dbAdapter
private PreparedStatement selectStatement;
private PreparedStatement insertStatement;
@@ -81,17 +84,26 @@ public class LazyDbKvState implements KvState {
private final StateCache cache;
// Checkpoint ID and timestamp of the last successful checkpoint for loading
- // missing values to the cache
+ // missing values to the cache. The lookup timestamp may be incremented
+ // between snapshots as we evict modified values from the cache.
private long lookupId;
private long lookupTs;
// ------------------------------------------------------
+ /**
+ * Constructor to initialize the {@link LazyDbKvState} the first time the
+ * job starts.
+ */
public LazyDbKvState(String kvStateId, Connection con, DbBackendConfig conf, TypeSerializer keySerializer,
TypeSerializer valueSerializer, V defaultValue) throws IOException {
this(kvStateId, con, conf, keySerializer, valueSerializer, defaultValue, -1, -1);
}
+ /**
+ * Initialize the {@link LazyDbKvState} from a snapshot given a lookup id
+ * and timestamp.
+ */
public LazyDbKvState(String kvStateId, Connection con, final DbBackendConfig conf, TypeSerializer keySerializer,
TypeSerializer valueSerializer, V defaultValue, long lookupId, long lookupTs) throws IOException {
@@ -101,15 +113,20 @@ public LazyDbKvState(String kvStateId, Connection con, final DbBackendConfig con
this.valueSerializer = valueSerializer;
this.defaultValue = defaultValue;
+ this.maxInsertBatchSize = conf.getMaxKvInsertBatchSize();
+ this.con = con;
+ this.dbAdapter = conf.getDbAdapter();
+
this.lookupId = lookupId;
this.lookupTs = lookupTs;
this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());
- this.maxInsertBatchSize = conf.getMaxKvInsertBatchSize();
- this.con = con;
- this.dbAdapter = conf.getDbAdapter();
initDB(this.con);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lazy database kv-state ({}) successfully initialized", kvStateId);
+ }
}
@Override
@@ -143,9 +160,14 @@ public DbKvStateSnapshot shapshot(long checkpointId, long timestamp) throw
int rowsInBatchStatement = 0;
- // We write the cached entries to the database
+ // We write the cached and modified entries to the database then clear
+ // the map of modified entries
for (Entry> state : cache.modified.entrySet()) {
- rowsInBatchStatement = batchInsert(checkpointId, timestamp, state.getKey(), state.getValue().orNull(),
+ rowsInBatchStatement = batchInsert(
+ checkpointId,
+ timestamp,
+ state.getKey(),
+ state.getValue().orNull(),
rowsInBatchStatement);
}
cache.modified.clear();
@@ -156,7 +178,7 @@ public DbKvStateSnapshot shapshot(long checkpointId, long timestamp) throw
}
// Update the lookup id and timestamp so future cache loads will return
- // the checkpointed values
+ // consistent values
lookupId = checkpointId;
lookupTs = timestamp;
@@ -164,9 +186,8 @@ public DbKvStateSnapshot shapshot(long checkpointId, long timestamp) throw
}
/**
- * Returns the number of elements currently stored in the task (cache + cold
- * states). Note that the number of elements in the database is not counted
- * here.
+ * Returns the number of elements currently stored in the task's cache. Note
+ * that the number of elements in the database is not counted here.
*/
@Override
public int size() {
@@ -199,10 +220,6 @@ public Boolean call() throws Exception {
}
}, NUM_RETRIES);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lazy database kv-state ({}) successfully initialized", kvStateId);
- }
}
/**
@@ -211,13 +228,16 @@ public Boolean call() throws Exception {
* "flush" the batch contents passing null as key.
*/
private int batchInsert(final long checkpointId, final long timestamp, final K key, final V value,
- final int batchCount)
- throws IOException {
+ final int batchCount) throws IOException {
return retry(new Callable() {
public Integer call() throws Exception {
if (key != null) {
- dbAdapter.setKVCheckpointInsertParams(kvStateId, insertStatement, checkpointId, timestamp,
+ dbAdapter.setKVCheckpointInsertParams(
+ kvStateId,
+ insertStatement,
+ checkpointId,
+ timestamp,
InstantiationUtil.serializeToByteArray(keySerializer, key),
value != null ? InstantiationUtil.serializeToByteArray(valueSerializer, value) : null);
insertStatement.addBatch();
@@ -230,14 +250,22 @@ public Integer call() throws Exception {
con.setAutoCommit(false);
insertStatement.executeBatch();
con.commit();
+
+ // If the commit was successful we clear the batch an turn
+ // autocommit back for the connection
insertStatement.clearBatch();
con.setAutoCommit(true);
+ if (Log.isDebugEnabled()) {
+ Log.debug("Written {} records to the Db for state {}.", batchCount, kvStateId);
+ }
return 0;
} else {
return batchCount + 1;
}
}
}, new Callable() {
+ // When the retrier get's an exception during the batch insert it
+ // should roll back the transaction before trying it again
public Boolean call() throws Exception {
con.rollback();
return true;
@@ -280,7 +308,6 @@ private static class DbKvStateSnapshot implements KvStateSnapshot restoreState(final DbStateBackend stateBackend,
return retry(new Callable>() {
public LazyDbKvState call() throws Exception {
- stateBackend.getConfiguration().getDbAdapter().cleanupFailedCheckpoints(kvStateId,
+ // First clean up the partially failed snapshots using the
+ // recovery timestamp, then reinitialize the KvState
+
+ stateBackend.getConfiguration().getDbAdapter()
+ .cleanupFailedCheckpoints(
+ kvStateId,
+ stateBackend.getConnection(),
+ lookupId,
+ recoveryTimestamp);
+
+ LazyDbKvState restored = new LazyDbKvState(
+ kvStateId,
stateBackend.getConnection(),
- lookupId, recoveryTimestamp);
+ stateBackend.getConfiguration(),
+ keySerializer,
+ valueSerializer,
+ defaultValue,
+ lookupId,
+ lookupTs);
if (LOG.isDebugEnabled()) {
LOG.debug("KV state({},{}) restored.", lookupId, lookupTs);
}
- return new LazyDbKvState(kvStateId, stateBackend.getConnection(),
- stateBackend.getConfiguration(), keySerializer, valueSerializer,
- defaultValue, lookupId, lookupTs);
+
+ return restored;
}
}, NUM_RETRIES);
@@ -364,13 +406,15 @@ public Optional get(Object key) {
value = Optional.fromNullable(getFromDatabaseOrNull((K) key));
put((K) key, value);
}
+ // We currently mark elements that were retreived also as modified
+ // in case the user applies some mutation without update
modified.put((K) key, value);
return value;
}
@Override
protected boolean removeEldestEntry(Entry> eldest) {
- // We remove manually
+ // We remove elements manually if the cache becomes full
return false;
}
@@ -398,32 +442,53 @@ public V call() throws Exception {
}
/**
- * If the cache is full the min(insertBatchSize, evictionSize) least
- * recently accessed elements will be removed from the cache and written
- * to the database if necessary.
+ * If the cache is full we remove the evictionSize least recently
+ * accessed elements and write them to the database if they were
+ * modified since the last checkpoint.
*/
private void evictIfFull() {
if (size() > cacheSize) {
try {
int numEvicted = 0;
int rowsInBatch = 0;
+ boolean writtenToDb = false;
+
Iterator>> entryIterator = entrySet().iterator();
- while (numEvicted < evictionSize && rowsInBatch < maxInsertBatchSize && entryIterator.hasNext()) {
+ while (numEvicted++ < evictionSize && entryIterator.hasNext()) {
+
Entry> next = entryIterator.next();
// We only need to write to the database if modified
if (modified.remove(next.getKey()) != null) {
- rowsInBatch = batchInsert(lookupId, lookupTs + 1, next.getKey(), next.getValue().orNull(),
+ // We insert elements with the last checkpoint id
+ // but an incremented timestamp to ensure good
+ // lookups. This won't interfere with the actual
+ // checkpoint timestamps.
+ rowsInBatch = batchInsert(
+ lookupId,
+ lookupTs + 1,
+ next.getKey(),
+ next.getValue().orNull(),
numEvicted);
+ if (rowsInBatch == 0) {
+ writtenToDb = true;
+ }
}
entryIterator.remove();
- numEvicted++;
}
+ // Flush batch if has rows
if (rowsInBatch > 0) {
batchInsert(0, 0, null, null, 0);
+ writtenToDb = true;
}
- lookupTs = lookupTs + 1;
+
+ // If we have written new values to the database we need to
+ // increment our lookup timestamp
+ if (writtenToDb) {
+ lookupTs = lookupTs + 1;
+ }
+
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 30bcdee2994de0..849b690e86ae4b 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -73,10 +73,13 @@ public static void startDerbyServer() throws UnknownHostException, Exception {
@AfterClass
public static void stopDerbyServer() throws Exception {
- server.shutdown();
- FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1"));
- FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB2"));
- FileUtils.forceDelete(new File("derby.log"));
+ try {
+ server.shutdown();
+ FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1"));
+ FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB2"));
+ FileUtils.forceDelete(new File("derby.log"));
+ } catch (Exception ignore) {
+ }
}
@Test
@@ -257,7 +260,7 @@ public void testCaching() throws Exception {
DbBackendConfig conf = DbStateBackendTest.conf.createConfigForShard(0);
conf.setKvCacheSize(3);
conf.setMaxKvInsertBatchSize(2);
-
+
// We evict 2 elements when the cache is full
conf.setMaxKvCacheEvictFraction(0.6f);
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index 4ba1ffeca97ede..d3ae2a69ffd6cb 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -45,7 +45,7 @@ public void createCheckpointsTable(String jobId, Connection con) throws SQLExcep
+ "checkpointId bigint, "
+ "timestamp bigint, "
+ "handleId bigint,"
- + "serializedData blob,"
+ + "checkpoint blob,"
+ "PRIMARY KEY (handleId)"
+ ")");
} catch (SQLException se) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 70d62b4fd9ea50..7b2c2d4b3e8e60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -107,9 +107,9 @@ public void testSetState() {
coord.restoreLatestCheckpointedState(map, true, false);
// verify that each stateful vertex got the state
- verify(statefulExec1, times(1)).setInitialState(serializedState, Mockito.anyLong());
- verify(statefulExec2, times(1)).setInitialState(serializedState, Mockito.anyLong());
- verify(statefulExec3, times(1)).setInitialState(serializedState, Mockito.anyLong());
+ verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
+ verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
+ verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
verify(statelessExec1, times(0)).setInitialState(Mockito.>>any(), Mockito.anyLong());
verify(statelessExec2, times(0)).setInitialState(Mockito.>>any(), Mockito.anyLong());
}