Skip to content

Commit

Permalink
[FLINK-9981][state] Testing and performance tuning for RocksDB-based …
Browse files Browse the repository at this point in the history
…priority queue

This closes apache#6438.
  • Loading branch information
StefanRRichter committed Aug 2, 2018
1 parent db00cb4 commit a20fd1d
Show file tree
Hide file tree
Showing 37 changed files with 1,279 additions and 1,819 deletions.
4 changes: 2 additions & 2 deletions docs/_includes/generated/rocks_db_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
<td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
<td style="word-wrap: break-word;">"HEAP"</td>
<td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
<td>This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@
@Internal
public class ByteArrayInputStreamWithPos extends InputStream {

private static final byte[] EMPTY = new byte[0];

protected byte[] buffer;
protected int position;
protected int count;
protected int mark = 0;

public ByteArrayInputStreamWithPos() {
this(EMPTY);
}

public ByteArrayInputStreamWithPos(byte[] buffer) {
this(buffer, 0, buffer.length);
}

public ByteArrayInputStreamWithPos(byte[] buffer, int offset, int length) {
this.position = offset;
this.buffer = buffer;
this.mark = offset;
this.count = Math.min(buffer.length, offset + length);
setBuffer(buffer, offset, length);
}

@Override
Expand Down Expand Up @@ -122,4 +125,11 @@ public void setPosition(int pos) {
Preconditions.checkArgument(pos >= 0 && pos <= count, "Position out of bounds.");
this.position = pos;
}

public void setBuffer(byte[] buffer, int offset, int length) {
this.count = Math.min(buffer.length, offset + length);
setPosition(offset);
this.buffer = buffer;
this.mark = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,18 @@ public void testSetNegativePosition() throws Exception {
thrown.expectMessage("Position out of bounds.");
stream.setPosition(-1);
}

@Test
public void testSetBuffer() {
ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos();
Assert.assertEquals(-1, in.read());
byte[] testData = new byte[]{0x42, 0x43, 0x44, 0x45};
int off = 1;
int len = 2;
in.setBuffer(testData, off, len);
for (int i = 0; i < len; ++i) {
Assert.assertEquals(testData[i + off], in.read());
}
Assert.assertEquals(-1, in.read());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ backup_config
change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"

if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'rocks' ]; then
set_conf "state.backend.rocksdb.timer-service.impl" "rocksdb"
set_conf "state.backend.rocksdb.timer-service.factory" "rocksdb"
fi

setup_flink_slf4j_metric_reporter
Expand Down
6 changes: 0 additions & 6 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,6 @@ under the License.
<!-- Version is set in root POM -->
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>8.2.1</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import javax.annotation.Nullable;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Interface for collection that gives in order access to elements w.r.t their priority.
Expand All @@ -37,16 +35,6 @@
@Internal
public interface InternalPriorityQueue<T> {

/**
* Polls from the top of the queue as long as the the queue is not empty and passes the elements to
* {@link Consumer} until a {@link Predicate} rejects an offered element. The rejected element is not
* removed from the queue and becomes the new head.
*
* @param canConsume bulk polling ends once this returns false. The rejected element is nor removed and not consumed.
* @param consumer consumer function for elements accepted by canConsume.
*/
void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer);

/**
* Retrieves and removes the first element (w.r.t. the order) of this set,
* or returns {@code null} if this set is empty.
Expand Down

This file was deleted.

Loading

0 comments on commit a20fd1d

Please sign in to comment.