From db2a964a450c05cb2aad3843999d994e4b8e5ef5 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sat, 21 Nov 2015 18:29:55 +0100 Subject: [PATCH] [FLINK-2924] [streaming] Execute compactions in background thread + keep connections alive on empty snapshots Closes #1305 --- .../contrib/streaming/state/DbAdapter.java | 7 ++ .../streaming/state/DbBackendConfig.java | 76 ----------- .../streaming/state/DbStateBackend.java | 4 +- .../streaming/state/DbStateHandle.java | 8 +- .../streaming/state/LazyDbKvState.java | 118 ++++++++++++++---- .../contrib/streaming/state/MySqlAdapter.java | 7 ++ .../streaming/state/DbStateBackendTest.java | 95 +++++++++----- .../flink/runtime/state/StateBackend.java | 4 +- .../state/filesystem/FsStateBackend.java | 2 +- .../state/memory/MemoryStateBackend.java | 2 +- .../runtime/state/FileStateBackendTest.java | 6 +- .../runtime/state/MemoryStateBackendTest.java | 6 +- .../api/operators/AbstractStreamOperator.java | 4 +- 13 files changed, 188 insertions(+), 151 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java index 2162f32622aa1..26c27ddd0b1ca 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java @@ -178,4 +178,11 @@ void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedS */ void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException; + /** + * Execute a simple operation to refresh the current database connection in + * case no data is written for a longer time period. Usually something like + * "select 1" + */ + void keepAlive(Connection con) throws SQLException; + } diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java index 55ecf836cf9b5..883b65ab8ff12 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java @@ -327,80 +327,4 @@ public ShardedConnection createShardedConnection() throws SQLException { return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner); } } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof DbBackendConfig)) { - return false; - } - DbBackendConfig other = (DbBackendConfig) obj; - if (JDBCDriver == null) { - if (other.JDBCDriver != null) { - return false; - } - } else if (!JDBCDriver.equals(other.JDBCDriver)) { - return false; - } - if (dbAdapter == null) { - if (other.dbAdapter != null) { - return false; - } - } else if (!dbAdapter.getClass().equals(other.dbAdapter.getClass())) { - return false; - } - if (kvStateCacheSize != other.kvStateCacheSize) { - return false; - } - if (kvStateCompactionFreq != other.kvStateCompactionFreq) { - return false; - } - if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction)) { - return false; - } - if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) { - return false; - } - if (maxNumberOfSqlRetries != other.maxNumberOfSqlRetries) { - return false; - } - if (shardPartitioner == null) { - if (other.shardPartitioner != null) { - return false; - } - } else if (!shardPartitioner.getClass().equals(other.shardPartitioner.getClass())) { - return false; - } - if (shardUrls == null) { - if (other.shardUrls != null) { - return false; - } - } else if (!shardUrls.equals(other.shardUrls)) { - return false; - } - if (sleepBetweenSqlRetries != other.sleepBetweenSqlRetries) { - return false; - } - if (userName == null) { - if (other.userName != null) { - return false; - } - } else if (!userName.equals(other.userName)) { - return false; - } - if (userPassword == null) { - if (other.userPassword != null) { - return false; - } - } else if (!userPassword.equals(other.userPassword)) { - return false; - } - return true; - } - } diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index dce0df883a463..72482aedeb0c0 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -177,10 +177,10 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp } @Override - public LazyDbKvState createKvState(int operatorId, String stateName, + public LazyDbKvState createKvState(String stateId, String stateName, TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws IOException { return new LazyDbKvState( - stateName + "_" + operatorId + "_" + env.getJobID().toShortString(), + stateId + "_" + env.getJobID().toShortString(), env.getIndexInSubtaskGroup() == 0, getConnections(), getConfiguration(), diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java index fa300a4169e2e..2ecfcc4b7840d 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java @@ -25,7 +25,8 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.InstantiationUtil; -import org.eclipse.jetty.util.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * State handle implementation for storing checkpoints as byte arrays in @@ -35,6 +36,7 @@ public class DbStateHandle implements Serializable, StateHandle { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class); private final String jobId; private final DbBackendConfig dbConfig; @@ -75,8 +77,8 @@ public Boolean call() throws Exception { }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); } catch (IOException e) { // We don't want to fail the job here, but log the error. - if (Log.isDebugEnabled()) { - Log.debug("Could not discard state."); + if (LOG.isDebugEnabled()) { + LOG.debug("Could not discard state."); } } } diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java index 12a3332dca3ae..3d7abff1c9ffd 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -75,6 +77,8 @@ public class LazyDbKvState implements KvState, Check private final int maxInsertBatchSize; // We will do database compaction every so many checkpoints private final int compactEvery; + // Executor for automatic compactions + private ExecutorService executor = null; // Database properties private final DbBackendConfig conf; @@ -96,7 +100,7 @@ public class LazyDbKvState implements KvState, Check private long nextTs; private Map completedCheckpoints = new HashMap<>(); - private long lastCompactedTs; + private volatile long lastCompactedTs; // ------------------------------------------------------ @@ -119,6 +123,10 @@ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, this.kvStateId = kvStateId; this.compact = compact; + if (compact) { + // Compactions will run in a seperate thread + executor = Executors.newSingleThreadExecutor(); + } this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; @@ -186,13 +194,28 @@ public DbKvStateSnapshot snapshot(long checkpointId, long timestamp) throw + "this should not happen."); } - // We insert the modified elements to the database with the current - // timestamp then clear the modified states - for (Entry> state : cache.modified.entrySet()) { - batchInsert.add(state, timestamp); + // If there are any modified states we perform the inserts + if (!cache.modified.isEmpty()) { + // We insert the modified elements to the database with the current + // timestamp then clear the modified states + for (Entry> state : cache.modified.entrySet()) { + batchInsert.add(state, timestamp); + } + batchInsert.flush(timestamp); + cache.modified.clear(); + } else if (compact) { + // Otherwise we call the keep alive method to avoid dropped + // connections (only call this on the compactor instance) + for (final Connection c : connections.connections()) { + SQLRetrier.retry(new Callable() { + @Override + public Void call() throws Exception { + dbAdapter.keepAlive(c); + return null; + } + }, numSqlRetries, sqlRetrySleep); + } } - batchInsert.flush(timestamp); - cache.modified.clear(); nextTs = timestamp + 1; completedCheckpoints.put(checkpointId, timestamp); @@ -240,23 +263,14 @@ public Void call() throws Exception { @Override public void notifyCheckpointComplete(long checkpointId) { - Long ts = completedCheckpoints.remove(checkpointId); + final Long ts = completedCheckpoints.remove(checkpointId); if (ts == null) { LOG.warn("Complete notification for missing checkpoint: " + checkpointId); - ts = 0L; - } - // If compaction is turned on we compact on the first subtask - if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) { - try { - for (Connection c : connections.connections()) { - dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts); - } - lastCompactedTs = ts; - if (LOG.isDebugEnabled()) { - LOG.debug("State succesfully compacted for {}.", kvStateId); - } - } catch (SQLException e) { - LOG.warn("State compaction failed due: {}", e); + } else { + // If compaction is turned on we compact on the compactor subtask + // asynchronously in the background + if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) { + executor.execute(new Compactor(ts)); } } } @@ -275,6 +289,10 @@ public void dispose() { } catch (SQLException e) { // There is not much to do about this } + + if (executor != null) { + executor.shutdown(); + } } /** @@ -294,15 +312,25 @@ public Map> getModified() { return cache.modified; } - public boolean isCompacter() { + /** + * Used for testing purposes + */ + public boolean isCompactor() { return compact; } /** - * Snapshot that stores a specific checkpoint timestamp and state id, and also - * rolls back the database to that point upon restore. The rollback is done - * by removing all state checkpoints that have timestamps between the checkpoint - * and recovery timestamp. + * Used for testing purposes + */ + public ExecutorService getExecutor() { + return executor; + } + + /** + * Snapshot that stores a specific checkpoint timestamp and state id, and + * also rolls back the database to that point upon restore. The rollback is + * done by removing all state checkpoints that have timestamps between the + * checkpoint and recovery timestamp. * */ private static class DbKvStateSnapshot implements KvStateSnapshot { @@ -557,4 +585,40 @@ public void flush(long timestamp) throws IOException { } } + + private class Compactor implements Runnable { + + private long upperBound; + + public Compactor(long upperBound) { + this.upperBound = upperBound; + } + + @Override + public void run() { + // We create new database connections to make sure we don't + // interfere with the checkpointing (connections are not thread + // safe) + try (ShardedConnection sc = conf.createShardedConnection()) { + for (final Connection c : sc.connections()) { + SQLRetrier.retry(new Callable() { + @Override + public Void call() throws Exception { + dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound); + return null; + } + }, numSqlRetries, sqlRetrySleep); + } + if (LOG.isInfoEnabled()) { + LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId, + lastCompactedTs, + upperBound); + } + lastCompactedTs = upperBound; + } catch (SQLException | IOException e) { + LOG.warn("State compaction failed due: {}", e); + } + } + + } } diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java index 7d3eca0bd9ec3..9eaa2833d13de 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java @@ -234,5 +234,12 @@ private void setKvInsertParams(String stateId, PreparedStatement insertStatement insertStatement.setNull(4, Types.BLOB); } } + + @Override + public void keepAlive(Connection con) throws SQLException { + try(Statement smt = con.createStatement()) { + smt.executeQuery("SELECT 1"); + } + } } diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java index 5f8610e75baad..209086f8d94a0 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java @@ -30,13 +30,14 @@ import java.net.UnknownHostException; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.derby.drda.NetworkServerControl; @@ -51,7 +52,6 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.shaded.com.google.common.collect.Lists; -import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -100,16 +100,17 @@ public void testSetupAndSerialization() throws Exception { // serialize / copy the backend DbStateBackend backend = CommonTestUtils.createCopySerializable(dbBackend); assertFalse(backend.isInitialized()); - assertEquals(dbBackend.getConfiguration(), backend.getConfiguration()); Environment env = new DummyEnvironment("test", 1, 0); backend.initializeForJob(env); assertNotNull(backend.getConnections()); - assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString())); + assertTrue( + isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString())); backend.disposeAllStateForCurrentJob(); - assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString())); + assertFalse( + isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString())); backend.close(); assertTrue(backend.getConnections().getFirst().isClosed()); @@ -165,7 +166,7 @@ public void testKeyValueState() throws Exception { backend.initializeForJob(env); - LazyDbKvState kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE, + LazyDbKvState kv = backend.createKvState("state1_1", "state1", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); String tableName = "state1_1_" + env.getJobID().toShortString(); @@ -196,19 +197,9 @@ public void testKeyValueState() throws Exception { kv.setCurrentKey(3); kv.update("u3"); - assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100)); - - kv.notifyCheckpointComplete(682375462378L); - // draw another snapshot KvStateSnapshot snapshot2 = kv.snapshot(682375462379L, 200); - assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100)); - assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200)); - kv.notifyCheckpointComplete(682375462379L); - // Compaction should be performed - assertFalse(containsKey(backend.getConnections().getFirst(), tableName, 1, 100)); - assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200)); // validate the original state assertEquals(3, kv.size()); @@ -238,9 +229,11 @@ public void testKeyValueState() throws Exception { } @Test - public void testCleanupTasks() throws Exception { + public void testCompaction() throws Exception { DbBackendConfig conf = new DbBackendConfig("flink", "flink", url1); - conf.setDbAdapter(new DerbyAdapter()); + MockAdapter adapter = new MockAdapter(); + conf.setKvStateCompactionFrequency(2); + conf.setDbAdapter(adapter); DbStateBackend backend1 = new DbStateBackend(conf); DbStateBackend backend2 = new DbStateBackend(conf); @@ -250,9 +243,40 @@ public void testCleanupTasks() throws Exception { backend2.initializeForJob(new DummyEnvironment("test", 3, 1)); backend3.initializeForJob(new DummyEnvironment("test", 3, 2)); - assertTrue(backend1.createKvState(1, "a", null, null, null).isCompacter()); - assertFalse(backend2.createKvState(1, "a", null, null, null).isCompacter()); - assertFalse(backend3.createKvState(1, "a", null, null, null).isCompacter()); + LazyDbKvState s1 = backend1.createKvState("a_1", "a", null, null, null); + LazyDbKvState s2 = backend2.createKvState("a_1", "a", null, null, null); + LazyDbKvState s3 = backend3.createKvState("a_1", "a", null, null, null); + + assertTrue(s1.isCompactor()); + assertFalse(s2.isCompactor()); + assertFalse(s3.isCompactor()); + assertNotNull(s1.getExecutor()); + assertNull(s2.getExecutor()); + assertNull(s3.getExecutor()); + + s1.snapshot(1, 100); + s1.notifyCheckpointComplete(1); + s1.snapshot(2, 200); + s1.snapshot(3, 300); + s1.notifyCheckpointComplete(2); + s1.notifyCheckpointComplete(3); + s1.snapshot(4, 400); + s1.snapshot(5, 500); + s1.notifyCheckpointComplete(4); + s1.notifyCheckpointComplete(5); + + s1.dispose(); + s2.dispose(); + s3.dispose(); + + // Wait until the compaction completes + s1.getExecutor().awaitTermination(5, TimeUnit.SECONDS); + assertEquals(2, adapter.numCompcations.get()); + assertEquals(5, adapter.keptAlive); + + backend1.close(); + backend2.close(); + backend3.close(); } @Test @@ -279,7 +303,7 @@ public void testCaching() throws Exception { backend.initializeForJob(env); - LazyDbKvState kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE, + LazyDbKvState kv = backend.createKvState("state1_1", "state1", IntSerializer.INSTANCE, StringSerializer.INSTANCE, "a"); assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName)); @@ -423,16 +447,6 @@ private static boolean isTableEmpty(Connection con, String tableName) throws SQL } } - private static boolean containsKey(Connection con, String tableName, int key, long ts) - throws SQLException, IOException { - try (PreparedStatement smt = con - .prepareStatement("select * from " + tableName + " where k=? and timestamp=?")) { - smt.setBytes(1, InstantiationUtil.serializeToByteArray(IntSerializer.INSTANCE, key)); - smt.setLong(2, ts); - return smt.executeQuery().next(); - } - } - private static String localFileUri(File path) { return path.toURI().toString(); } @@ -444,4 +458,21 @@ private static void deleteDirectorySilently(File dir) { } } + private static class MockAdapter extends DerbyAdapter { + + private static final long serialVersionUID = 1L; + public AtomicInteger numCompcations = new AtomicInteger(0); + public int keptAlive = 0; + + @Override + public void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException { + numCompcations.incrementAndGet(); + } + + @Override + public void keepAlive(Connection con) throws SQLException { + keptAlive++; + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index 6f72bce3620a4..293de956a055c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -76,7 +76,7 @@ public abstract class StateBackend> implem /** * Creates a key/value state backed by this state backend. * - * @param operatorId Unique id for the operator creating the state + * @param stateId Unique id that identifies the kv state in the streaming program. * @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. @@ -88,7 +88,7 @@ public abstract class StateBackend> implem * * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstract KvState createKvState(int operatorId, String stateName, + public abstract KvState createKvState(String stateId, String stateName, TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 6a94a80e740a0..25c63e5d2252a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -238,7 +238,7 @@ public void close() throws Exception {} // ------------------------------------------------------------------------ @Override - public FsHeapKvState createKvState(int operatorId, String stateName, + public FsHeapKvState createKvState(String stateId, String stateName, TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws Exception { return new FsHeapKvState(keySerializer, valueSerializer, defaultValue, this); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index f3e7552cae517..2963237738311 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -83,7 +83,7 @@ public void close() throws Exception {} // ------------------------------------------------------------------------ @Override - public MemHeapKvState createKvState(int operatorId, String stateName, + public MemHeapKvState createKvState(String stateId, String stateName, TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) { return new MemHeapKvState(keySerializer, valueSerializer, defaultValue); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 4f10acd10362c..37ccde2bb5d19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -235,7 +235,7 @@ public void testKeyValueState() { File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); KvState kv = - backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + backend.createKvState("0", "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); assertEquals(0, kv.size()); @@ -324,7 +324,7 @@ public void testRestoreWithWrongSerializers() { File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); KvState kv = - backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + backend.createKvState("a_0", "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); kv.setCurrentKey(1); kv.update("1"); @@ -394,7 +394,7 @@ public void testCopyDefaultValue() { backend.initializeForJob(new DummyEnvironment("test", 0, 0)); KvState kv = - backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); + backend.createKvState("a_0", "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); kv.setCurrentKey(1); IntValue default1 = kv.value(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index 8ffe617536f43..4b5aebd0c74cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -147,7 +147,7 @@ public void testKeyValueState() { MemoryStateBackend backend = new MemoryStateBackend(); KvState kv = - backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + backend.createKvState("s_0", "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); assertEquals(0, kv.size()); @@ -222,7 +222,7 @@ public void testRestoreWithWrongSerializers() { try { MemoryStateBackend backend = new MemoryStateBackend(); KvState kv = - backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + backend.createKvState("s_0", "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); kv.setCurrentKey(1); kv.update("1"); @@ -282,7 +282,7 @@ public void testCopyDefaultValue() { try { MemoryStateBackend backend = new MemoryStateBackend(); KvState kv = - backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); + backend.createKvState("a_0", "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); kv.setCurrentKey(1); IntValue default1 = kv.value(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 025b44a764d6a..3f1cfae68485e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -330,8 +330,10 @@ else if (this.keySerializer != null) { } if (kvstate == null) { + // create unique state id from operator id + state name + String stateId = name + "_" + getOperatorConfig().getVertexID(); // create a new blank key/value state - kvstate = stateBackend.createKvState(getOperatorConfig().getVertexID() ,name , keySerializer, valueSerializer, defaultValue); + kvstate = stateBackend.createKvState(stateId ,name , keySerializer, valueSerializer, defaultValue); } if (keyValueStatesByName == null) {