Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Updated javadocs and some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Oct 28, 2015
1 parent 98e9ac0 commit fa266ac
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 107 deletions.
14 changes: 7 additions & 7 deletions flink-contrib/flink-streaming-contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<version>10.12.1.1</version>
<scope>test</scope>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>10.12.1.1</version>
<scope>test</scope>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
/**
*
* Adapter for bridging inconsistencies between the different SQL
* implementations.
* implementations. The default implementation has been tested to work well with
* MySQL
*
*/
public class DbAdapter implements Serializable {
Expand All @@ -39,6 +40,11 @@ public class DbAdapter implements Serializable {
// Non-partitioned state checkpointing
// -----------------------------------------------------------------------------

/**
* Initialize tables for storing non-partitioned checkpoints for the given
* job id and database connection.
*
*/
public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
Expand All @@ -47,32 +53,75 @@ public void createCheckpointsTable(String jobId, Connection con) throws SQLExcep
+ "checkpointId bigint, "
+ "timestamp bigint, "
+ "handleId bigint,"
+ "serializedData blob,"
+ "checkpoint blob,"
+ "PRIMARY KEY (handleId)"
+ ")");
}

}

/**
* Checkpoints will be inserted in the database using prepared statements.
* This methods should prepare and return the statement that will be used
* later to insert using the given connection.
*
*/
public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException {
return con.prepareStatement(
"INSERT INTO checkpoints_" + jobId
+ " (checkpointId, timestamp, handleId, serializedData) VALUES (?,?,?,?)");
+ " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
}

/**
* Set the {@link PreparedStatement} parameters for the statement returned
* by {@link #prepareCheckpointInsert(String, Connection)}.
*
* @param jobId
* Id of the current job.
* @param insertStatement
* Statement returned by
* {@link #prepareCheckpointInsert(String, Connection)}.
* @param checkpointId
* Global checkpoint id.
* @param timestamp
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @param checkpoint
* The serialized checkpoint.
* @throws SQLException
*/
public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
long timestamp,
long handleId, byte[] checkpoint) throws SQLException {
long timestamp, long handleId, byte[] checkpoint) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
insertStatement.setLong(3, handleId);
insertStatement.setBytes(4, checkpoint);
}

public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long handleId) throws SQLException {
/**
* Retrieve the serialized checkpoint data from the database.
*
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
* Global checkpoint id.
* @param checkpointTs
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException {
try (Statement smt = con.createStatement()) {
ResultSet rs = smt.executeQuery(
"SELECT serializedData FROM checkpoints_" + jobId
"SELECT checkpoint FROM checkpoints_" + jobId
+ " WHERE handleId = " + handleId);
if (rs.next()) {
return rs.getBytes(1);
Expand All @@ -82,14 +131,38 @@ public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, lon
}
}

public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long handleId) throws SQLException {
/**
* Remove the given checkpoint from the database.
*
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
* Global checkpoint id.
* @param checkpointTs
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"DELETE FROM checkpoints_" + jobId
+ " WHERE handleId = " + handleId);
}
}

/**
* Remove all states for the given JobId, by for instance dropping the
* entire table.
*
* @throws SQLException
*/
public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
Expand All @@ -101,6 +174,11 @@ public void disposeAllStateForJob(String jobId, Connection con) throws SQLExcept
// Partitioned state checkpointing
// -----------------------------------------------------------------------------

/**
* Initialize the necessary tables for the given stateId. The state id
* consist of the JobId+OperatorId+StateName.
*
*/
public void createKVStateTable(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
Expand All @@ -117,6 +195,50 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
}
}

/**
* Prepare the the statement that will be used to insert key-value pairs in
* the database.
*
*/
public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
return con.prepareStatement(
"INSERT INTO kvstate_" + stateId + " (checkpointId, timestamp, k, v) VALUES (?,?,?,?)");
}

/**
* Insert new key-value pair in the database. The method should be able to
* receive null values which mark the removal of a key (tombstone).
*
* @param stateId
* Unique identifier of the kvstate (usually the table name).
* @param insertStatement
* Statement prepared in
* {@link #prepareKVCheckpointInsert(String, Connection)}
* @param checkpointId
* @param timestamp
* @param key
* @param value
* @throws SQLException
*/
public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
long timestamp,
byte[] key, byte[] value) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
insertStatement.setBytes(3, key);
if (value != null) {
insertStatement.setBytes(4, value);
} else {
insertStatement.setNull(4, Types.BLOB);
}
}

/**
* Prepare the statement that will be used to lookup keys from the database.
* Keys and values are assumed to be byte arrays.
*
*/
public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
return con.prepareStatement("SELECT v"
Expand All @@ -127,6 +249,24 @@ public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws
+ " ORDER BY checkpointId DESC LIMIT 1");
}

/**
* Retrieve the latest value from the database for a given key that has
* checkpointId <= lookupId and checkpointTs <= lookupTs.
*
* @param stateId
* Unique identifier of the kvstate (usually the table name).
* @param lookupStatement
* The statement returned by
* {@link #prepareKeyLookup(String, Connection)}.
* @param key
* The key to lookup.
* @param lookupId
* Latest checkpoint id to select.
* @param lookupTs
* Latest checkpoint ts to select.
* @return The latest valid value for the key.
* @throws SQLException
*/
public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId,
long lookupTs) throws SQLException {
lookupStatement.setBytes(1, key);
Expand All @@ -142,36 +282,27 @@ public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[
}
}

public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
validateStateId(stateId);
return con.prepareStatement(
"INSERT INTO kvstate_" + stateId + " (checkpointId, timestamp, k, v) VALUES (?,?,?,?)");
}

public void setKVCheckpointInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
long timestamp,
byte[] key, byte[] value) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
insertStatement.setBytes(3, key);
if (value != null) {
insertStatement.setBytes(4, value);
} else {
insertStatement.setNull(4, Types.BLOB);
}
}

public void cleanupFailedCheckpoints(String stateId, Connection con, long lookupId, long lookupTs)
/**
* Remove partially failed snapshots using the latest id and a global
* recovery timestamp. All records with a higher id but lower timestamp
* should be deleted from the database.
*
*/
public void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId, long reoveryTs)
throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE FROM kvstate_" + stateId
+ " WHERE checkpointId > " + lookupId
+ " AND timestamp < " + lookupTs);
+ " WHERE checkpointId > " + checkpointId
+ " AND timestamp < " + reoveryTs);
}
}

public static void validateStateId(String name) {
/**
* Tries to avoid SQL injection with weird state names.
*
*/
protected static void validateStateId(String name) {
if (!name.matches("[a-zA-Z0-9_]+")) {
throw new RuntimeException("State name contains invalid characters.");
}
Expand Down
Loading

0 comments on commit fa266ac

Please sign in to comment.