Skip to content

Commit

Permalink
[FLINK-24234][connectors] Time based flushing for AsyncSinkWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
CrynetLogistics authored and dannycranmer committed Sep 15, 2021
1 parent de36259 commit 583e20d
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Deque;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
Expand All @@ -50,15 +51,14 @@
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {

private static final int BYTES_IN_MB = 1024 * 1024;

private final MailboxExecutor mailboxExecutor;
private final Sink.ProcessingTimeService timeService;

private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
private final double flushOnBufferSizeMB;
private final long flushOnBufferSizeInBytes;
private final long maxTimeInBufferMS;

/**
* The ElementConverter provides a mapping between for the elements of a stream to request
Expand All @@ -70,7 +70,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final ElementConverter<InputT, RequestEntryT> elementConverter;

/**
* Buffer to hold request entries that should be persisted into the destination.
* Buffer to hold request entries that should be persisted into the destination, along with its
* size in bytes.
*
* <p>A request entry contain all relevant details to make a call to the destination. Eg, for
* Kinesis Data Streams a request entry contains the payload and partition key.
Expand All @@ -81,7 +82,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* construct a new (retry) request entry from the response and add that back to the queue for
* later retry.
*/
private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
new ArrayDeque<>();

/**
* Tracks all pending async calls that have been executed since the last checkpoint. Calls that
Expand All @@ -100,16 +102,11 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable

/**
* Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
* the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
* the criterion for flushing after {@code flushOnBufferSizeInBytes} is reached.
*/
private double bufferedRequestEntriesTotalSizeMB;
private double bufferedRequestEntriesTotalSizeInBytes;

/**
* Tracks the size of each element in {@code bufferedRequestEntries}. The sizes are stored in MB
* and the position in the deque reflects the position of the corresponding element in {@code
* bufferedRequestEntries}.
*/
private final Deque<Double> bufferedRequestEntriesSizeMB = new ArrayDeque<>();
private boolean existsActiveTimerCallback = false;

/**
* This method specifies how to persist buffered request entries into the destination. It is
Expand Down Expand Up @@ -143,15 +140,16 @@ protected abstract void submitRequestEntries(
* @param requestEntry the requestEntry for which we want to know the size
* @return the size of the requestEntry, as defined previously
*/
protected abstract int getSizeInBytes(RequestEntryT requestEntry);
protected abstract long getSizeInBytes(RequestEntryT requestEntry);

public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
double flushOnBufferSizeMB) {
long flushOnBufferSizeInBytes,
long maxTimeInBufferMS) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
Expand All @@ -160,37 +158,49 @@ public AsyncSinkWriter(
Preconditions.checkArgument(maxBatchSize > 0);
Preconditions.checkArgument(maxBufferedRequests > 0);
Preconditions.checkArgument(maxInFlightRequests > 0);
Preconditions.checkArgument(flushOnBufferSizeInBytes > 0);
Preconditions.checkArgument(maxTimeInBufferMS > 0);
Preconditions.checkArgument(
maxBufferedRequests > maxBatchSize,
"The maximum number of requests that may be buffered should be strictly"
+ " greater than the maximum number of requests per batch.");
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
this.flushOnBufferSizeMB = flushOnBufferSizeMB;
this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;

this.inFlightRequestsCount = 0;
this.bufferedRequestEntriesTotalSizeMB = 0;
this.bufferedRequestEntriesTotalSizeInBytes = 0;
}

private void registerCallback() {
Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
instant -> {
existsActiveTimerCallback = false;
while (!bufferedRequestEntries.isEmpty()) {
flush();
}
};
timeService.registerProcessingTimer(
timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
existsActiveTimerCallback = true;
}

@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
mailboxExecutor.yield();
mailboxExecutor.tryYield();
}

RequestEntryT requestEntry = elementConverter.apply(element, context);
double requestEntrySizeMB = getSizeInMB(requestEntry);
bufferedRequestEntries.add(requestEntry);
bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
addEntryToBuffer(elementConverter.apply(element, context), false);

flushIfAble();
}

private void flushIfAble() throws InterruptedException {
private void flushIfAble() {
while (bufferedRequestEntries.size() >= maxBatchSize
|| bufferedRequestEntriesTotalSizeMB >= flushOnBufferSizeMB) {
|| bufferedRequestEntriesTotalSizeInBytes >= flushOnBufferSizeInBytes) {
flush();
}
}
Expand All @@ -201,18 +211,18 @@ private void flushIfAble() throws InterruptedException {
*
* <p>The method blocks if too many async requests are in flight.
*/
private void flush() throws InterruptedException {
private void flush() {
while (inFlightRequestsCount >= maxInFlightRequests) {
mailboxExecutor.yield();
mailboxExecutor.tryYield();
}

List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);

int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
for (int i = 0; i < batchSize; i++) {
batch.add(bufferedRequestEntries.remove());
double elementSizeMB = bufferedRequestEntriesSizeMB.remove();
bufferedRequestEntriesTotalSizeMB -= elementSizeMB;
RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
batch.add(elem.getRequestEntry());
bufferedRequestEntriesTotalSizeInBytes -= elem.getSize();
}

if (batch.size() == 0) {
Expand All @@ -238,17 +248,24 @@ private void flush() throws InterruptedException {
*/
private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
inFlightRequestsCount--;
failedRequestEntries.forEach(
failedEntry -> {
bufferedRequestEntries.addFirst(failedEntry);
double sizeOfFailedEntry = getSizeInMB(failedEntry);
bufferedRequestEntriesSizeMB.addFirst(sizeOfFailedEntry);
bufferedRequestEntriesTotalSizeMB += sizeOfFailedEntry;
});
failedRequestEntries.forEach(failedEntry -> addEntryToBuffer(failedEntry, true));
}

private double getSizeInMB(RequestEntryT requestEntry){
return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;
private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
registerCallback();
}

RequestEntryWrapper<RequestEntryT> wrappedEntry =
new RequestEntryWrapper<>(entry, getSizeInBytes(entry));

if (insertAtHead) {
bufferedRequestEntries.addFirst(wrappedEntry);
} else {
bufferedRequestEntries.add(wrappedEntry);
}

bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
}

/**
Expand All @@ -260,9 +277,9 @@ private double getSizeInMB(RequestEntryT requestEntry){
* <p>To this end, all in-flight requests need to completed before proceeding with the commit.
*/
@Override
public List<Void> prepareCommit(boolean flush) throws InterruptedException {
public List<Void> prepareCommit(boolean flush) {
while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) {
mailboxExecutor.yield();
mailboxExecutor.tryYield();
if (flush) {
flush();
}
Expand All @@ -279,7 +296,10 @@ public List<Void> prepareCommit(boolean flush) throws InterruptedException {
*/
@Override
public List<Collection<RequestEntryT>> snapshotState() {
return Arrays.asList(bufferedRequestEntries);
return Arrays.asList(
bufferedRequestEntries.stream()
.map(RequestEntryWrapper::getRequestEntry)
.collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.annotation.PublicEvolving;

/**
* A wrapper that contains a {@code RequestEntryT} ready to be written by the Sink Writer class,
* along with the size of that entry as defined by the method {@code getSizeInBytes(RequestEntryT)}
* of the {@code AsyncSinkWriter}.
*
* @param <RequestEntryT> Corresponds to the type parameter of the same name in {@code
* AsyncSinkWriter}
*/
@PublicEvolving
public class RequestEntryWrapper<RequestEntryT> {

private final RequestEntryT requestEntry;
private final long size;

public RequestEntryWrapper(RequestEntryT requestEntry, long size) {
this.requestEntry = requestEntry;
this.size = size;
}

public RequestEntryT getRequestEntry() {
return requestEntry;
}

public long getSize() {
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> {
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
private final double flushOnBufferSizeMB;
private final long flushOnBufferSizeInBytes;
private final long maxTimeInBufferMS;

public ArrayListAsyncSink() {
this(25, 1, 100, 0.1);
this(25, 1, 100, 100000, 1000);
}

public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
double flushOnBufferSizeMB) {
public ArrayListAsyncSink(
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long flushOnBufferSizeInBytes,
long maxTimeInBufferMS) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
this.flushOnBufferSizeMB = flushOnBufferSizeMB;
this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;
}

@Override
Expand All @@ -59,7 +65,8 @@ public SinkWriter<String, Void, Collection<Integer>> createWriter(
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
flushOnBufferSizeMB) {
flushOnBufferSizeInBytes,
maxTimeInBufferMS) {

@Override
protected void submitRequestEntries(
Expand All @@ -69,7 +76,7 @@ protected void submitRequestEntries(
}

@Override
protected int getSizeInBytes(Integer requestEntry) {
protected long getSizeInBytes(Integer requestEntry) {
return 4;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand All @@ -41,7 +42,7 @@ public void testWriteTwentyThousandRecordsToGenericSink() throws Exception {
public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
env.fromSequence(999_999, 1_000_100)
.map(Object::toString)
.sinkTo(new ArrayListAsyncSink(1, 1, 2, 10));
.sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000));
Exception e =
assertThrows(
JobExecutionException.class,
Expand Down
Loading

0 comments on commit 583e20d

Please sign in to comment.