Skip to content

Commit

Permalink
[FLINK-7938] Introduce addAll() to ListState
Browse files Browse the repository at this point in the history
This closes apache#5281.
  • Loading branch information
bowenli86 authored and StefanRRichter committed Jan 19, 2018
1 parent 1f9c2d9 commit 1484080
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ for each key that the operation sees). The value can be set using `update(T)` an
`T value()`.

* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
over all currently stored elements. Elements are added using `add(T)`, the Iterable can
over all currently stored elements. Elements are added using `add(T)` or `addAll(List<T>)`, the Iterable can
be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`

* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,12 @@ boolean isClearCalled() {
public void update(List<T> values) throws Exception {
clear();

if (values != null && !values.isEmpty()) {
addAll(values);
}

@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
list.addAll(values);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,12 @@ public boolean isClearCalled() {
public void update(List<T> values) throws Exception {
list.clear();

if (values != null || !values.isEmpty()) {
addAll(values);
}

@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
list.addAll(values);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

List<byte[]> bytes = new ArrayList<>(values.size());
for (V value : values) {
keySerializationStream.reset();
valueSerializer.serialize(value, out);
bytes.add(keySerializationStream.toByteArray());
byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
backend.db.put(columnFamily, writeOptions, key, premerge);
} else {
throw new IOException("Failed pre-merge values in update()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while updating data to RocksDB", e);
}
}
}

@Override
public void addAll(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();

byte[] premerge = MergeUtils.merge(bytes);
byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
backend.db.put(columnFamily, writeOptions, key, premerge);
backend.db.merge(columnFamily, writeOptions, key, premerge);
} else {
throw new IOException("Failed pre-merge values");
throw new IOException("Failed pre-merge values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while updating data to RocksDB", e);
}
}
}

private byte[] getPreMergedValue(List<V> values) throws IOException {
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

List<byte[]> bytes = new ArrayList<>(values.size());
for (V value : values) {
keySerializationStream.reset();
valueSerializer.serialize(value, out);
bytes.add(keySerializationStream.toByteArray());
}

return MergeUtils.merge(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
*
* <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3
* Number of values added | time for add() | time for update() | perf improvement of update() over add()
* 10 236875 ns 17048 ns 13.90x
* 50 312332 ns 14281 ns 21.87x
* 100 393791 ns 18360 ns 21.45x
* 500 978703 ns 55397 ns 17.66x
* 1000 3044179 ns 89474 ns 34.02x
* 5000 9247395 ns 305580 ns 30.26x
Expand Down Expand Up @@ -83,7 +86,7 @@ public void testRocksDbListStateAPIs() throws Exception {
final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);

// The number of values added to ListState. Can be changed for benchmarking
final int num = 1000;
final int num = 10;

try (
final Options options = new Options()
Expand All @@ -103,7 +106,6 @@ public void testRocksDbListStateAPIs() throws Exception {

// ----- add() API -----
log.info("begin add");
System.out.println("begin add");

final long beginInsert1 = System.nanoTime();
for (int i = 0; i < num; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public interface AppendingState<IN, OUT> extends State {
* to the list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* @param value
* The new value for the state.
* @param value The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
/**
* Updates the state of the current key for the given source namespaces into the state of
* the target namespace.
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* If `null` or an empty list is passed in, the state value will be null
*
* @param values The target namespace where the merged state should be stored.
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;

/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* If `null` or an empty list is passed in, the state value remains unchanged
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public static <V> ImmutableListState<V> createState(
public void update(List<V> values) throws Exception {
throw MODIFICATION_ATTEMPT_ERROR;
}

@Override
public void addAll(List<V> values) throws Exception {
throw MODIFICATION_ATTEMPT_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ public long[] write(FSDataOutputStream out) throws IOException {
public void update(List<S> values) throws Exception {
internalList.clear();

addAll(values);
}

@Override
public void addAll(List<S> values) throws Exception {
if (values != null && !values.isEmpty()) {
internalList.addAll(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public void clear() {
public void update(List<T> values) throws Exception {
originalState.update(values);
}

@Override
public void addAll(List<T> values) throws Exception {
originalState.addAll(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,22 @@ public void update(List<V> values) throws Exception {
map.put(namespace, new ArrayList<>(values));
}
}

@Override
public void addAll(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
final StateTable<K, N, ArrayList<V>> map = stateTable;

ArrayList<V> list = map.get(currentNamespace);

if (list == null) {
list = new ArrayList<>();
}

list.addAll(values);

map.put(namespace, list);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,28 @@
*/
public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
/**
* Updates the state of the current key for the given source namespaces into the state of
* the target namespace.
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* @param values The target namespace where the merged state should be stored.
* If `null` or an empty list is passed in, the state value will be null
*
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;

/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* If `null` or an empty list is passed in, the state value remains unchanged
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ public void testListState() throws Exception {
}

@Test
public void testListStateAddUpdateAndGet() throws Exception {
public void testListStateAPIs() throws Exception {

AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);

Expand Down Expand Up @@ -1318,7 +1318,22 @@ public void testListStateAddUpdateAndGet() throws Exception {

keyedBackend.setCurrentKey("g");
assertNull(state.get());
state.addAll(null);
assertNull(state.get());
state.addAll(new ArrayList<>());
assertNull(state.get());
state.addAll(Arrays.asList(3L, 4L));
assertThat(state.get(), containsInAnyOrder(3L, 4L));
state.addAll(null);
assertThat(state.get(), containsInAnyOrder(3L, 4L));
state.addAll(new ArrayList<>());
assertThat(state.get(), containsInAnyOrder(3L, 4L));
state.addAll(new ArrayList<>());
assertThat(state.get(), containsInAnyOrder(3L, 4L));
state.addAll(Arrays.asList(5L, 6L));
assertThat(state.get(), containsInAnyOrder(3L, 4L, 5L, 6L));
state.update(Arrays.asList(1L, 2L));
assertThat(state.get(), containsInAnyOrder(1L, 2L));

keyedBackend.setCurrentKey("def");
assertThat(state.get(), containsInAnyOrder(10L, 16L));
Expand Down

0 comments on commit 1484080

Please sign in to comment.