Skip to content

Commit

Permalink
[FLINK-10175][state] Fix concurrent access to shared buffer between R…
Browse files Browse the repository at this point in the history
…ocksDBMapState and querable state
  • Loading branch information
StefanRRichter committed Aug 22, 2018
1 parent 09eb13a commit 2b6beed
Showing 1 changed file with 72 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class RocksDBMapState<K, N, UK, UV>
* @param backend The backend for which this state is bind to.
*/
private RocksDBMapState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
Map<UK, UV> defaultValue,
RocksDBKeyedStateBackend<K> backend) {
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
Map<UK, UV> defaultValue,
RocksDBKeyedStateBackend<K> backend) {

super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);

Expand Down Expand Up @@ -122,14 +122,14 @@ public UV get(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
}

@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = serializeUserValue(userValue);
byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView);

backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
Expand All @@ -143,7 +143,7 @@ public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
byte[] rawValueBytes = serializeUserValue(entry.getValue());
byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
}
}
Expand Down Expand Up @@ -180,7 +180,7 @@ public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
public Iterable<UK> keys() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();

return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
public UK next() {
RocksDBMapEntry entry = nextEntry();
Expand All @@ -193,7 +193,7 @@ public UK next() {
public Iterable<UV> values() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();

return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
public UV next() {
RocksDBMapEntry entry = nextEntry();
Expand All @@ -206,7 +206,7 @@ public UV next() {
public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();

return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
public Map.Entry<UK, UV> next() {
return nextEntry();
Expand All @@ -218,7 +218,7 @@ public Map.Entry<UK, UV> next() {
public void clear() {
try {
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
WriteBatch writeBatch = new WriteBatch(128)) {

final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
iterator.seek(keyPrefixBytes);
Expand All @@ -243,10 +243,10 @@ public void clear() {
@Override
@SuppressWarnings("unchecked")
public byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {

Preconditions.checkNotNull(serializedKeyAndNamespace);
Preconditions.checkNotNull(safeKeySerializer);
Expand All @@ -255,19 +255,20 @@ public byte[] getSerializedValue(

//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);

int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());

ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128);
ByteArrayDataInputView inputView = new ByteArrayDataInputView();

writeKeyWithGroupAndNamespace(
keyGroup,
keyAndNamespace.f0,
safeKeySerializer,
keyAndNamespace.f1,
safeNamespaceSerializer,
outputView);
keyGroup,
keyAndNamespace.f0,
safeKeySerializer,
keyAndNamespace.f1,
safeNamespaceSerializer,
outputView);

final byte[] keyPrefixBytes = outputView.toByteArray();

Expand All @@ -277,10 +278,11 @@ public byte[] getSerializedValue(
final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();

final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
backend.db,
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer) {
backend.db,
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer,
inputView) {

@Override
public Map.Entry<UK, UV> next() {
Expand Down Expand Up @@ -313,15 +315,11 @@ private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOE
return dataOutputView.toByteArray();
}

private byte[] serializeUserValue(UV userValue) throws IOException {
return serializeUserValue(userValue, userValueSerializer);
}
private static <UV> byte[] serializeUserValue(
UV userValue,
TypeSerializer<UV> valueSerializer,
ByteArrayDataOutputView dataOutputView) throws IOException {

private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
return deserializeUserValue(rawValueBytes, userValueSerializer);
}

private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
dataOutputView.reset();

if (userValue == null) {
Expand All @@ -334,12 +332,20 @@ private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializ
return dataOutputView.toByteArray();
}

private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
private static <UK> UK deserializeUserKey(
ByteArrayDataInputView dataInputView,
int userKeyOffset,
byte[] rawKeyBytes,
TypeSerializer<UK> keySerializer) throws IOException {
dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
return keySerializer.deserialize(dataInputView);
}

private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
private static <UV> UV deserializeUserValue(
ByteArrayDataInputView dataInputView,
byte[] rawValueBytes,
TypeSerializer<UV> valueSerializer) throws IOException {

dataInputView.setData(rawValueBytes);

boolean isNull = dataInputView.readBoolean();
Expand Down Expand Up @@ -388,17 +394,22 @@ private class RocksDBMapEntry implements Map.Entry<UK, UV> {
/** The offset of User Key offset in raw key bytes. */
private final int userKeyOffset;

private TypeSerializer<UK> keySerializer;
private final TypeSerializer<UK> keySerializer;

private final TypeSerializer<UV> valueSerializer;

private TypeSerializer<UV> valueSerializer;
private final ByteArrayDataInputView dataInputView;
private final ByteArrayDataOutputView dataOutputView;

RocksDBMapEntry(
@Nonnull final RocksDB db,
@Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer) {
@Nonnull final RocksDB db,
@Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer,
@Nonnull ByteArrayDataInputView dataInputView,
@Nonnull ByteArrayDataOutputView dataOutputView) {
this.db = db;

this.userKeyOffset = userKeyOffset;
Expand All @@ -408,6 +419,8 @@ private class RocksDBMapEntry implements Map.Entry<UK, UV> {
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
this.dataInputView = dataInputView;
this.dataOutputView = dataOutputView;
}

public void remove() {
Expand All @@ -425,7 +438,7 @@ public void remove() {
public UK getKey() {
if (userKey == null) {
try {
userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer);
userKey = deserializeUserKey(dataInputView, userKeyOffset, rawKeyBytes, keySerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the user key.", e);
}
Expand All @@ -441,7 +454,7 @@ public UV getValue() {
} else {
if (userValue == null) {
try {
userValue = deserializeUserValue(rawValueBytes, valueSerializer);
userValue = deserializeUserValue(dataInputView, rawValueBytes, valueSerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the user value.", e);
}
Expand All @@ -461,7 +474,7 @@ public UV setValue(UV value) {

try {
userValue = value;
rawValueBytes = serializeUserValue(value, valueSerializer);
rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView);

db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
} catch (IOException | RocksDBException e) {
Expand Down Expand Up @@ -499,17 +512,20 @@ private abstract class RocksDBMapIterator<T> implements Iterator<T> {

private final TypeSerializer<UK> keySerializer;
private final TypeSerializer<UV> valueSerializer;
private final ByteArrayDataInputView dataInputView;

RocksDBMapIterator(
final RocksDB db,
final byte[] keyPrefixBytes,
final TypeSerializer<UK> keySerializer,
final TypeSerializer<UV> valueSerializer) {
final RocksDB db,
final byte[] keyPrefixBytes,
final TypeSerializer<UK> keySerializer,
final TypeSerializer<UV> valueSerializer,
ByteArrayDataInputView dataInputView) {

this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.dataInputView = dataInputView;
}

@Override
Expand Down Expand Up @@ -564,7 +580,7 @@ private void loadCache() {
* The iteration starts from the prefix bytes at the first loading. The cache then is
* reloaded when the next entry to return is the last one in the cache. At that time,
* we will start the iterating from the last returned entry.
*/
*/
RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);

Expand Down Expand Up @@ -597,7 +613,9 @@ private void loadCache() {
iterator.key(),
iterator.value(),
keySerializer,
valueSerializer);
valueSerializer,
dataInputView,
dataOutputView);

cacheEntries.add(entry);

Expand Down

0 comments on commit 2b6beed

Please sign in to comment.