Skip to content

Commit

Permalink
[FLINK-17547][task] Use iterator for unconsumed buffers.
Browse files Browse the repository at this point in the history
Motivation: support spilled records
Changes:
1. change SpillingAdaptiveSpanningRecordDeserializer.getUnconsumedBuffer
signature
2. adapt channel state persistence to new types

No changes in existing logic.
  • Loading branch information
rkhachatryan authored and pnowojski committed May 19, 2020
1 parent 824100e commit 37f441a
Show file tree
Hide file tree
Showing 23 changed files with 235 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

import javax.annotation.Nonnull;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;

import static java.util.Arrays.asList;

/**
* This interface represents an {@link Iterator} that is also {@link AutoCloseable}. A typical use-case for this
* interface are iterators that are based on native-resources such as files, network, or database connections. Clients
Expand All @@ -37,14 +42,77 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {

@Nonnull
static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator) {
return new IteratorAdapter<>(iterator);
return adapterForIterator(iterator, () -> {});
}

static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator, AutoCloseable close) {
return new IteratorAdapter<>(iterator, close);
}

static <T> CloseableIterator<T> fromList(List<T> list, Consumer<T> closeNotConsumed) {
return new CloseableIterator<T>(){
private final Deque<T> stack = new ArrayDeque<>(list);

@Override
public boolean hasNext() {
return !stack.isEmpty();
}

@Override
public T next() {
return stack.poll();
}

@Override
public void close() throws Exception {
Exception exception = null;
for (T el : stack) {
try {
closeNotConsumed.accept(el);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (exception != null) {
throw exception;
}
}
};
}

@SuppressWarnings("unchecked")
static <T> CloseableIterator<T> empty() {
return (CloseableIterator<T>) EMPTY_INSTANCE;
}

static <T> CloseableIterator<T> ofElements(Consumer<T> closeNotConsumed, T... elements) {
return fromList(asList(elements), closeNotConsumed);
}

static <E> CloseableIterator<E> ofElement(E element, Consumer<E> closeIfNotConsumed) {
return new CloseableIterator<E>(){
private boolean hasNext = true;

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public E next() {
hasNext = false;
return element;
}

@Override
public void close() {
if (hasNext) {
closeIfNotConsumed.accept(element);
}
}
};
}

/**
* Adapter from {@link Iterator} to {@link CloseableIterator}. Does nothing on {@link #close()}.
*
Expand All @@ -54,9 +122,11 @@ final class IteratorAdapter<E> implements CloseableIterator<E> {

@Nonnull
private final Iterator<E> delegate;
private final AutoCloseable close;

IteratorAdapter(@Nonnull Iterator<E> delegate) {
IteratorAdapter(@Nonnull Iterator<E> delegate, AutoCloseable close) {
this.delegate = delegate;
this.close = close;
}

@Override
Expand All @@ -80,7 +150,8 @@ public void forEachRemaining(Consumer<? super E> action) {
}

@Override
public void close() {
public void close() throws Exception {
close.close();
}
}
}
7 changes: 7 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/IOUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ public static void closeSocket(final Socket sock) {
}
}

/**
* @see #closeAll(Iterable)
*/
public static void closeAll(AutoCloseable... closeables) throws Exception {
closeAll(asList(closeables));
}

/**
* Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted
* after calling close() on every object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.EXECUTING;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.FAILED;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.NEW;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

interface ChannelStateWriteRequest {
long getCheckpointId();

void cancel(Throwable cause);
void cancel(Throwable cause) throws Exception;

static CheckpointInProgressRequest completeInput(long checkpointId) {
return new CheckpointInProgressRequest("completeInput", checkpointId, ChannelStateCheckpointWriter::completeInput, false);
Expand All @@ -46,8 +47,24 @@ static CheckpointInProgressRequest completeOutput(long checkpointId) {
return new CheckpointInProgressRequest("completeOutput", checkpointId, ChannelStateCheckpointWriter::completeOutput, false);
}

static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, Buffer... flinkBuffers) {
return new CheckpointInProgressRequest("writeInput", checkpointId, writer -> writer.writeInput(info, flinkBuffers), recycle(flinkBuffers), false);
static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, CloseableIterator<Buffer> iterator) {
return new CheckpointInProgressRequest(
"writeInput",
checkpointId,
writer -> {
while (iterator.hasNext()) {
Buffer buffer = iterator.next();
try {
checkArgument(buffer.isBuffer());
} catch (Exception e) {
buffer.recycleBuffer();
throw e;
}
writer.writeInput(info, buffer);
}
},
throwable -> iterator.close(),
false);
}

static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer... flinkBuffers) {
Expand All @@ -62,7 +79,7 @@ static ChannelStateWriteRequest abort(long checkpointId, Throwable cause) {
return new CheckpointInProgressRequest("abort", checkpointId, writer -> writer.fail(cause), true);
}

static Consumer<Throwable> recycle(Buffer[] flinkBuffers) {
static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
return unused -> {
for (Buffer b : flinkBuffers) {
b.recycleBuffer();
Expand Down Expand Up @@ -112,7 +129,7 @@ enum CheckpointInProgressRequestState {

final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
private final ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action;
private final Consumer<Throwable> discardAction;
private final ThrowingConsumer<Throwable, Exception> discardAction;
private final long checkpointId;
private final String name;
private final boolean ignoreMissingWriter;
Expand All @@ -123,7 +140,7 @@ final class CheckpointInProgressRequest implements ChannelStateWriteRequest {
}, ignoreMissingWriter);
}

CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, Consumer<Throwable> discardAction, boolean ignoreMissingWriter) {
CheckpointInProgressRequest(String name, long checkpointId, ThrowingConsumer<ChannelStateCheckpointWriter, Exception> action, ThrowingConsumer<Throwable, Exception> discardAction, boolean ignoreMissingWriter) {
this.checkpointId = checkpointId;
this.action = checkNotNull(action);
this.discardAction = checkNotNull(discardAction);
Expand All @@ -137,7 +154,7 @@ public long getCheckpointId() {
}

@Override
public void cancel(Throwable cause) {
public void cancel(Throwable cause) throws Exception {
if (state.compareAndSet(NEW, CANCELLED) || state.compareAndSet(FAILED, CANCELLED)) {
discardAction.accept(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
try {
dispatchInternal(request);
} catch (Exception e) {
request.cancel(e);
try {
request.cancel(e);
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;

import static org.apache.flink.util.IOUtils.closeAll;

/**
* Executes {@link ChannelStateWriteRequest}s in a separate thread. Any exception occurred during execution causes this
Expand Down Expand Up @@ -67,8 +70,15 @@ void run() {
} catch (Exception ex) {
thrown = ex;
} finally {
cleanupRequests();
dispatcher.fail(thrown == null ? new CancellationException() : thrown);
try {
closeAll(
this::cleanupRequests,
() -> dispatcher.fail(thrown == null ? new CancellationException() : thrown)
);
} catch (Exception e) {
//noinspection NonAtomicOperationOnVolatileField
thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
}
}
LOG.debug("loop terminated");
}
Expand All @@ -87,14 +97,12 @@ private void loop() throws Exception {
}
}

private void cleanupRequests() {
private void cleanupRequests() throws Exception {
Throwable cause = thrown == null ? new CancellationException() : thrown;
List<ChannelStateWriteRequest> drained = new ArrayList<>();
deque.drainTo(drained);
LOG.info("discarding {} drained requests", drained.size());
for (ChannelStateWriteRequest request : drained) {
request.cancel(cause);
}
closeAll(drained.stream().<AutoCloseable>map(request -> () -> request.cancel(cause)).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.util.CloseableIterator;

import java.io.Closeable;
import java.util.Collection;
Expand Down Expand Up @@ -99,11 +100,10 @@ boolean isDone() {
* It is intended to use for incremental snapshots.
* If no data is passed it is ignored.
* @param data zero or more <b>data</b> buffers ordered by their sequence numbers
* @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer() isn't a buffer}
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
*/
void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException;
void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data);

/**
* Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
Expand Down Expand Up @@ -161,7 +161,7 @@ public void start(long checkpointId, CheckpointOptions checkpointOptions) {
}

@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -103,10 +104,9 @@ public void start(long checkpointId, CheckpointOptions checkpointOptions) {
}

@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
LOG.debug("add input data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
checkpointId, info, startSeqNum, data == null ? 0 : data.length);
enqueue(write(checkpointId, info, checkBufferType(data)), false);
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
LOG.debug("add input data, checkpoint id: {}, channel: {}, startSeqNum: {}", checkpointId, info, startSeqNum);
enqueue(write(checkpointId, info, iterator), false);
}

@Override
Expand Down Expand Up @@ -168,8 +168,13 @@ private void enqueue(ChannelStateWriteRequest request, boolean atTheFront) {
executor.submit(request);
}
} catch (Exception e) {
request.cancel(e);
throw new RuntimeException("unable to send request to worker", e);
RuntimeException wrapped = new RuntimeException("unable to send request to worker", e);
try {
request.cancel(e);
} catch (Exception cancelException) {
wrapped.addSuppressed(cancelException);
}
throw wrapped;
}
}

Expand Down
Loading

0 comments on commit 37f441a

Please sign in to comment.