Skip to content

Commit

Permalink
[FLINK-9070][state] Improve the performance of RocksDBMapState.clear(…
Browse files Browse the repository at this point in the history
…) with WriteBatch.

This closes apache#5979.
  • Loading branch information
sihuazhou authored and StefanRRichter committed May 18, 2018
1 parent fafef15 commit 87e54eb
Showing 1 changed file with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -221,11 +222,23 @@ public Map.Entry<UK, UV> next() {
@Override
public void clear() {
try {
Iterator<Map.Entry<UK, UV>> iterator = iterator();
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {

while (iterator.hasNext()) {
iterator.next();
iterator.remove();
final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
iterator.seek(keyPrefixBytes);

while (iterator.isValid()) {
byte[] keyBytes = iterator.key();
if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
writeBatch.remove(columnFamily, keyBytes);
} else {
break;
}
iterator.next();
}

backend.db.write(writeOptions, writeBatch);
}
} catch (Exception e) {
LOG.warn("Error while cleaning the state.", e);
Expand Down Expand Up @@ -351,6 +364,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
return isNull ? null : valueSerializer.deserialize(in);
}

private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
}

for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
if (rawKeyBytes[i] != keyPrefixBytes[i]) {
return false;
}
}

return true;
}

// ------------------------------------------------------------------------
// Internal Classes
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -572,7 +599,7 @@ private void loadCache() {
}

while (true) {
if (!iterator.isValid() || !underSameKey(iterator.key())) {
if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
expired = true;
break;
}
Expand All @@ -595,19 +622,5 @@ private void loadCache() {
}
}
}

private boolean underSameKey(byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
}

for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
if (rawKeyBytes[i] != keyPrefixBytes[i]) {
return false;
}
}

return true;
}
}
}

0 comments on commit 87e54eb

Please sign in to comment.