Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Use short job id for table names
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent 43b8e57 commit cd8be0b
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@

import org.apache.flink.api.java.tuple.Tuple2;

/**
* Adapter interface for executing different checkpointing related operations on
* the underlying database.
*
*/
public interface DbAdapter extends Serializable {

/**
Expand Down Expand Up @@ -168,9 +173,9 @@ void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedS
long checkpointTimestamp, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;

/**
* Compact the states between two checkpoint timestamp by only keeping the most
* recent.
* Compact the states between two checkpoint timestamp by only keeping the
* most recent.
*/
void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ public DbStateHandle<S> call() throws Exception {
// We create a unique long id for each handle, but we also
// store the checkpoint id and timestamp for bookkeeping
long handleId = rnd.nextLong();
String jobIdShort = env.getJobID().toShortString();

dbAdapter.setCheckpointInsertParams(env.getJobID().toString(), insertStatement,
dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
checkpointID, timestamp, handleId,
InstantiationUtil.serializeObject(state));

insertStatement.executeUpdate();

return new DbStateHandle<S>(env.getJobID().toString(), checkpointID, timestamp, handleId,
return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
dbConfig);
}
}, numSqlRetries, sqlRetrySleep);
Expand All @@ -179,7 +180,7 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp
public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
env.getJobID() + "_" + operatorId + "_" + stateName,
stateName + "_" + operatorId + "_" + env.getJobID().toShortString(),
env.getIndexInSubtaskGroup() == 0,
getConnections(),
getConfiguration(),
Expand All @@ -194,7 +195,7 @@ public void initializeForJob(final Environment env) throws Exception {
this.env = env;

connections = dbConfig.createShardedConnection();

// We want the most light-weight transaction isolation level as we don't
// have conflicting reads/writes. We just want to be able to roll back
// batch inserts for k-v snapshots. This requirement might be removed in
Expand All @@ -203,13 +204,15 @@ public void initializeForJob(final Environment env) throws Exception {

// If we have a different backend for non-partitioned states we
// initialize that, otherwise create tables for storing the checkpoints.
//
// Currently all non-partitioned states are written to the first database shard
//
// Currently all non-partitioned states are written to the first
// database shard
if (nonPartitionedStateBackend == null) {
insertStatement = retry(new Callable<PreparedStatement>() {
public PreparedStatement call() throws SQLException {
dbAdapter.createCheckpointsTable(env.getJobID().toString(), getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(env.getJobID().toString(), getConnections().getFirst());
dbAdapter.createCheckpointsTable(env.getJobID().toShortString(), getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(env.getJobID().toShortString(),
getConnections().getFirst());
}
}, numSqlRetries, sqlRetrySleep);
} else {
Expand Down Expand Up @@ -237,7 +240,7 @@ public void close() throws Exception {
@Override
public void disposeAllStateForCurrentJob() throws Exception {
if (nonPartitionedStateBackend == null) {
dbAdapter.disposeAllStateForJob(env.getJobID().toString(), connections.getFirst());
dbAdapter.disposeAllStateForJob(env.getJobID().toShortString(), connections.getFirst());
} else {
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ public Boolean call() throws Exception {
public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"CREATE TABLE IF NOT EXISTS kvstate_" + stateId
"CREATE TABLE IF NOT EXISTS " + stateId
+ " ("
+ "timestamp bigint, "
+ "k varbinary(256), "
Expand All @@ -131,15 +131,15 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
@Override
public String prepareKVCheckpointInsert(String stateId) throws SQLException {
validateStateId(stateId);
return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?) "
return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?) "
+ "ON DUPLICATE KEY UPDATE v=? ";
}

@Override
public String prepareKeyLookup(String stateId) throws SQLException {
validateStateId(stateId);
return "SELECT v"
+ " FROM kvstate_" + stateId
+ " FROM " + stateId
+ " WHERE k = ?"
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC LIMIT 1";
Expand All @@ -165,7 +165,7 @@ public void cleanupFailedCheckpoints(String stateId, Connection con, long checkp
long recoveryTs) throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE FROM kvstate_" + stateId
smt.executeUpdate("DELETE FROM " + stateId
+ " WHERE timestamp > " + checkpointTs
+ " AND timestamp < " + recoveryTs);
}
Expand All @@ -177,10 +177,10 @@ public void compactKvStates(String stateId, Connection con, long lowerId, long u
validateStateId(stateId);

try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE state.* FROM kvstate_" + stateId + " AS state"
smt.executeUpdate("DELETE state.* FROM " + stateId + " AS state"
+ " JOIN"
+ " ("
+ " SELECT MAX(timestamp) AS maxts, k FROM kvstate_" + stateId
+ " SELECT MAX(timestamp) AS maxts, k FROM " + stateId
+ " WHERE timestamp BETWEEN " + lowerId + " AND " + upperId
+ " GROUP BY k"
+ " ) m"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ public void testSetupAndSerialization() throws Exception {
backend.initializeForJob(env);

assertNotNull(backend.getConnections());
assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));

backend.disposeAllStateForCurrentJob();
assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
backend.close();

assertTrue(backend.getConnections().getFirst().isClosed());
Expand Down Expand Up @@ -139,12 +139,12 @@ public void testSerializableState() throws Exception {
assertEquals(state2, handle2.getState(getClass().getClassLoader()));
handle2.discardState();

assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));

assertEquals(state3, handle3.getState(getClass().getClassLoader()));
handle3.discardState();

assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));

backend.close();

Expand All @@ -168,7 +168,7 @@ public void testKeyValueState() throws Exception {
LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
StringSerializer.INSTANCE, null);

String tableName = "kvstate_" + env.getJobID() + "_1_state1";
String tableName = "state1_1_" + env.getJobID().toShortString();
assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName));

assertEquals(0, kv.size());
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testCleanupTasks() throws Exception {

@Test
public void testCaching() throws Exception {

List<String> urls = Lists.newArrayList(url1, url2);
DbBackendConfig conf = new DbBackendConfig("flink", "flink",
urls);
Expand All @@ -273,18 +273,18 @@ public void testCaching() throws Exception {

Environment env = new DummyEnvironment("test", 2, 0);

String tableName = "kvstate_" + env.getJobID() + "_1_state1";
String tableName = "state1_1_" + env.getJobID().toShortString();
assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));

backend.initializeForJob(env);

LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
StringSerializer.INSTANCE, "a");

assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
assertTrue(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));

Map<Integer, Optional<String>> cache = kv.getStateCache();
Map<Integer, Optional<String>> modified = kv.getModified();

Expand Down Expand Up @@ -432,7 +432,7 @@ private static boolean containsKey(Connection con, String tableName, int key, lo
return smt.executeQuery().next();
}
}

private static String localFileUri(File path) {
return path.toURI().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"CREATE TABLE kvstate_" + stateId
"CREATE TABLE " + stateId
+ " ("
+ "timestamp bigint, "
+ "k varchar(256) for bit data, "
Expand All @@ -96,7 +96,7 @@ public void createKVStateTable(String stateId, Connection con) throws SQLExcepti
@Override
public String prepareKeyLookup(String stateId) throws SQLException {
validateStateId(stateId);
return "SELECT v " + "FROM kvstate_" + stateId
return "SELECT v " + "FROM " + stateId
+ " WHERE k = ? "
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC";
Expand All @@ -108,10 +108,10 @@ public void compactKvStates(String stateId, Connection con, long lowerBound, lon
validateStateId(stateId);

try (Statement smt = con.createStatement()) {
smt.executeUpdate("DELETE FROM kvstate_" + stateId + " t1"
smt.executeUpdate("DELETE FROM " + stateId + " t1"
+ " WHERE EXISTS"
+ " ("
+ " SELECT * FROM kvstate_" + stateId + " t2"
+ " SELECT * FROM " + stateId + " t2"
+ " WHERE t2.k = t1.k"
+ " AND t2.timestamp > t1.timestamp"
+ " AND t2.timestamp <=" + upperBound
Expand All @@ -123,7 +123,7 @@ public void compactKvStates(String stateId, Connection con, long lowerBound, lon
@Override
public String prepareKVCheckpointInsert(String stateId) throws SQLException {
validateStateId(stateId);
return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?)";
return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?)";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
SerializedValue<StateHandle<?>> operatorState,
long nextCheckpointId) {
long recoveryTimestamp) {

// Produced intermediate results
List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
Expand Down Expand Up @@ -652,7 +652,7 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
operatorState, nextCheckpointId);
operatorState, recoveryTimestamp);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,14 @@ else if (current == ExecutionState.CANCELING) {

// get our private reference onto the stack (be safe against concurrent changes)
SerializedValue<StateHandle<?>> operatorState = this.operatorState;
long nextCheckpointId = this.recoveryTs;
long recoveryTs = this.recoveryTs;

if (operatorState != null) {
if (invokable instanceof StatefulTask) {
try {
StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
StatefulTask<?> op = (StatefulTask<?>) invokable;
StateUtils.setOperatorState(op, state, nextCheckpointId);
StateUtils.setOperatorState(op, state, recoveryTs);
}
catch (Exception e) {
throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
Expand Down

0 comments on commit cd8be0b

Please sign in to comment.