Skip to content

Commit

Permalink
Reduce InboundBuffer pending queue's allocation pressure
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and vietj committed Feb 21, 2023
1 parent 6f0eb26 commit 0e77586
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions src/main/java/io/vertx/core/streams/impl/InboundBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.core.impl.ContextInternal;

import java.util.ArrayDeque;
import java.util.Collection;

/**
* A buffer that transfers elements to an handler with back-pressure.
Expand Down Expand Up @@ -71,7 +72,7 @@ public class InboundBuffer<E> {
public static final Object END_SENTINEL = new Object();

private final ContextInternal context;
private final ArrayDeque<E> pending;
private ArrayDeque<E> pending;
private final long highWaterMark;
private long demand;
private Handler<E> handler;
Expand All @@ -95,7 +96,8 @@ public InboundBuffer(Context context, long highWaterMark) {
this.context = (ContextInternal) context;
this.highWaterMark = highWaterMark;
this.demand = Long.MAX_VALUE;
this.pending = new ArrayDeque<>();
// empty ArrayDeque's constructor ArrayDeque allocates 16 elements; let's delay the allocation to be of the proper size
this.pending = null;
}

private void checkThread() {
Expand All @@ -116,6 +118,9 @@ public boolean write(E element) {
Handler<E> handler;
synchronized (this) {
if (demand == 0L || emitting) {
if (pending == null) {
pending = new ArrayDeque<>(1);
}
pending.add(element);
return checkWritable();
} else {
Expand All @@ -134,7 +139,7 @@ private boolean checkWritable() {
if (demand == Long.MAX_VALUE) {
return true;
} else {
long actual = pending.size() - demand;
long actual = size() - demand;
boolean writable = actual < highWaterMark;
overflow |= !writable;
return writable;
Expand All @@ -151,6 +156,15 @@ private boolean checkWritable() {
public boolean write(Iterable<E> elements) {
checkThread();
synchronized (this) {
final int requiredCapacity;
if (pending == null) {
if (elements instanceof Collection) {
requiredCapacity = ((Collection<E>) elements).size();
} else {
requiredCapacity = 1;
}
pending = new ArrayDeque<>(requiredCapacity);
}
for (E element : elements) {
pending.add(element);
}
Expand All @@ -168,7 +182,7 @@ private boolean emitPending() {
Handler<E> h;
while (true) {
synchronized (this) {
int size = pending.size();
int size = size();
if (demand == 0L) {
emitting = false;
boolean writable = size < highWaterMark;
Expand All @@ -181,6 +195,7 @@ private boolean emitPending() {
if (demand != Long.MAX_VALUE) {
demand--;
}
assert pending != null;
element = pending.poll();
h = this.handler;
}
Expand All @@ -201,7 +216,7 @@ private void drain() {
E element;
Handler<E> handler;
synchronized (this) {
int size = pending.size();
int size = size();
if (size == 0) {
emitting = false;
if (overflow) {
Expand All @@ -220,6 +235,7 @@ private void drain() {
if (demand != Long.MAX_VALUE) {
demand--;
}
assert pending != null;
element = pending.poll();
handler = this.handler;
}
Expand Down Expand Up @@ -271,7 +287,7 @@ public boolean fetch(long amount) {
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (pending.isEmpty() && !overflow)) {
if (emitting || (isEmpty() && !overflow)) {
return false;
}
emitting = true;
Expand All @@ -289,6 +305,9 @@ public boolean fetch(long amount) {
*/
public E read() {
synchronized (this) {
if (isEmpty()) {
return null;
}
return pending.poll();
}
}
Expand All @@ -301,6 +320,9 @@ public E read() {
* @return a reference to this, so the API can be used fluently
*/
public synchronized InboundBuffer<E> clear() {
if (isEmpty()) {
return this;
}
pending.clear();
return this;
}
Expand Down Expand Up @@ -376,14 +398,17 @@ public synchronized InboundBuffer<E> exceptionHandler(Handler<Throwable> handler
* @return whether the buffer is empty
*/
public synchronized boolean isEmpty() {
if (pending == null) {
return true;
}
return pending.isEmpty();
}

/**
* @return whether the buffer is writable
*/
public synchronized boolean isWritable() {
return pending.size() < highWaterMark;
return size() < highWaterMark;
}

/**
Expand All @@ -397,6 +422,6 @@ public synchronized boolean isPaused() {
* @return the actual number of elements in the buffer
*/
public synchronized int size() {
return pending.size();
return pending == null ? 0 : pending.size();
}
}

0 comments on commit 0e77586

Please sign in to comment.