Skip to content

Commit

Permalink
[FLINK-26592][state/changelog] Use mailbox in FsStateChangelogWriter …
Browse files Browse the repository at this point in the history
…instead of a lock

When a task thread tries to schedule an upload, it might wait for available capacity.
Capacity is released by the uploading thread on upload completion.  After releasing,
it must notify the task thread about the completion.
Both task and uploading thread acquire FsStateChangelogWriter.lock. That causes
a deadlock if uploader releases capacity insufficient for task thread to proceed.

This change removes the lock and makes uploader thread to use mailbox actions.
  • Loading branch information
rkhachatryan committed Mar 21, 2022
1 parent 32c2ca7 commit db4d05a
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
Expand Down Expand Up @@ -91,11 +92,12 @@ public FsStateChangelogStorage(
}

@Override
public FsStateChangelogWriter createWriter(String operatorID, KeyGroupRange keyGroupRange) {
public FsStateChangelogWriter createWriter(
String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
UUID logId = new UUID(0, logIdGenerator.getAndIncrement());
LOG.info("createWriter for operator {}/{}: {}", operatorID, keyGroupRange, logId);
return new FsStateChangelogWriter(
logId, keyGroupRange, uploader, preEmptivePersistThresholdInBytes);
logId, keyGroupRange, uploader, preEmptivePersistThresholdInBytes, mailboxExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.changelog.fs;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand All @@ -32,7 +33,6 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
Expand All @@ -46,7 +46,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.flink.util.IOUtils.closeAllQuietly;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -94,12 +93,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
private final StateChangeUploadScheduler uploader;
private final long preEmptivePersistThresholdInBytes;

/** Lock to synchronize handling of upload completion with new upload requests. */
// todo: replace with mailbox executor (after FLINK-23204)
private final Object lock = new Object();

/** A list of listener per upload (~ per checkpoint plus pre-emptive uploads). */
@GuardedBy("lock")
private final List<UploadCompletionListener> uploadCompletionListeners = new ArrayList<>();

/** Current {@link SequenceNumber}. */
Expand All @@ -109,7 +103,6 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
* {@link SequenceNumber} before which changes will NOT be requested, exclusive. Increased after
* materialization.
*/
@GuardedBy("lock")
private SequenceNumber lowestSequenceNumber = INITIAL_SQN;

/**
Expand All @@ -127,28 +120,28 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded = new TreeMap<>();

/** Uploaded changes, ready for use in snapshots. */
@GuardedBy("lock")
private final NavigableMap<SequenceNumber, UploadResult> uploaded = new TreeMap<>();

/**
* Highest {@link SequenceNumber} for which upload has failed (won't be restarted), inclusive.
*/
@Nullable
@GuardedBy("lock")
private Tuple2<SequenceNumber, Throwable> highestFailed;
@Nullable private Tuple2<SequenceNumber, Throwable> highestFailed;

@GuardedBy("lock")
private boolean closed;

private final MailboxExecutor mailboxExecutor;

FsStateChangelogWriter(
UUID logId,
KeyGroupRange keyGroupRange,
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes) {
long preEmptivePersistThresholdInBytes,
MailboxExecutor mailboxExecutor) {
this.logId = logId;
this.keyGroupRange = keyGroupRange;
this.uploader = uploader;
this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
this.mailboxExecutor = mailboxExecutor;
}

@Override
Expand Down Expand Up @@ -194,87 +187,90 @@ public CompletableFuture<ChangelogStateHandleStreamImpl> persist(SequenceNumber

private CompletableFuture<ChangelogStateHandleStreamImpl> persistInternal(SequenceNumber from)
throws IOException {
synchronized (lock) {
ensureCanPersist(from);
rollover();
Map<SequenceNumber, StateChangeSet> toUpload = drainTailMap(notUploaded, from);
NavigableMap<SequenceNumber, UploadResult> readyToReturn = uploaded.tailMap(from, true);
LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, toUpload);

SequenceNumberRange range = SequenceNumberRange.generic(from, activeSequenceNumber);
if (range.size() == readyToReturn.size()) {
checkState(toUpload.isEmpty());
return completedFuture(buildHandle(keyGroupRange, readyToReturn, 0L));
} else {
CompletableFuture<ChangelogStateHandleStreamImpl> future =
new CompletableFuture<>();
uploadCompletionListeners.add(
new UploadCompletionListener(keyGroupRange, range, readyToReturn, future));
if (!toUpload.isEmpty()) {
uploader.upload(
new UploadTask(
toUpload.values(),
this::handleUploadSuccess,
this::handleUploadFailure));
}
return future;
ensureCanPersist(from);
rollover();
Map<SequenceNumber, StateChangeSet> toUpload = drainTailMap(notUploaded, from);
NavigableMap<SequenceNumber, UploadResult> readyToReturn = uploaded.tailMap(from, true);
LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, toUpload);

SequenceNumberRange range = SequenceNumberRange.generic(from, activeSequenceNumber);
if (range.size() == readyToReturn.size()) {
checkState(toUpload.isEmpty());
return CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn, 0L));
} else {
CompletableFuture<ChangelogStateHandleStreamImpl> future = new CompletableFuture<>();
uploadCompletionListeners.add(
new UploadCompletionListener(keyGroupRange, range, readyToReturn, future));
if (!toUpload.isEmpty()) {
UploadTask uploadTask =
new UploadTask(
toUpload.values(),
this::handleUploadSuccess,
this::handleUploadFailure);
uploader.upload(uploadTask);
}
return future;
}
}

private void handleUploadFailure(List<SequenceNumber> failedSqn, Throwable throwable) {
synchronized (lock) {
if (closed) {
return;
}
uploadCompletionListeners.removeIf(
listener -> listener.onFailure(failedSqn, throwable));
failedSqn.stream()
.max(Comparator.naturalOrder())
.filter(sqn -> sqn.compareTo(lowestSequenceNumber) >= 0)
.filter(sqn -> highestFailed == null || sqn.compareTo(highestFailed.f0) > 0)
.ifPresent(sqn -> highestFailed = Tuple2.of(sqn, throwable));
}
mailboxExecutor.execute(
() -> {
if (closed) {
return;
}
uploadCompletionListeners.removeIf(
listener -> listener.onFailure(failedSqn, throwable));
failedSqn.stream()
.max(Comparator.naturalOrder())
.filter(sqn -> sqn.compareTo(lowestSequenceNumber) >= 0)
.filter(
sqn ->
highestFailed == null
|| sqn.compareTo(highestFailed.f0) > 0)
.ifPresent(sqn -> highestFailed = Tuple2.of(sqn, throwable));
},
"handleUploadFailure");
}

private void handleUploadSuccess(List<UploadResult> results) {
synchronized (lock) {
if (closed) {
results.forEach(
r -> closeAllQuietly(() -> r.getStreamStateHandle().discardState()));
} else {
uploadCompletionListeners.removeIf(listener -> listener.onSuccess(results));
for (UploadResult result : results) {
if (result.sequenceNumber.compareTo(lowestSequenceNumber) >= 0) {
uploaded.put(result.sequenceNumber, result);
mailboxExecutor.execute(
() -> {
if (closed) {
results.forEach(
r ->
closeAllQuietly(
() -> r.getStreamStateHandle().discardState()));
} else {
uploadCompletionListeners.removeIf(listener -> listener.onSuccess(results));
for (UploadResult result : results) {
if (result.sequenceNumber.compareTo(lowestSequenceNumber) >= 0) {
uploaded.put(result.sequenceNumber, result);
}
}
}
}
}
}
},
"handleUploadSuccess");
}

@Override
public void close() {
LOG.debug("close {}", logId);
synchronized (lock) {
checkState(!closed);
closed = true;
activeChangeSet.clear();
activeChangeSetSize = 0;
notUploaded.clear();
uploaded.clear();
}
checkState(!closed);
closed = true;
activeChangeSet.clear();
activeChangeSetSize = 0;
notUploaded.clear();
uploaded.clear();
}

@Override
public void truncate(SequenceNumber to) {
LOG.debug("truncate {} to sqn {} (excl.)", logId, to);
checkArgument(to.compareTo(activeSequenceNumber) <= 0);
synchronized (lock) {
lowestSequenceNumber = to;
notUploaded.headMap(lowestSequenceNumber, false).clear();
uploaded.headMap(lowestSequenceNumber, false).clear();
}
lowestSequenceNumber = to;
notUploaded.headMap(lowestSequenceNumber, false).clear();
uploaded.headMap(lowestSequenceNumber, false).clear();
}

private void rollover() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private Tuple2<Thread, CompletableFuture<Void>> uploadAsync(int limit, TestScena
return Tuple2.of(thread, future);
}

private static final class BlockingUploader implements StateChangeUploader {
static final class BlockingUploader implements StateChangeUploader {
private final AtomicBoolean blocking = new AtomicBoolean(true);
private final AtomicInteger uploadsCounter = new AtomicInteger();

Expand All @@ -449,7 +449,7 @@ public UploadTasksResult upload(Collection<UploadTask> tasks) {
@Override
public void close() {}

private void unblock() {
void unblock() {
blocking.set(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void testUploadsCounter() throws Exception {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
FsStateChangelogWriter writer = storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE);
FsStateChangelogWriter writer = createWriter(storage);

int numUploads = 5;
for (int i = 0; i < numUploads; i++) {
Expand All @@ -82,7 +83,7 @@ public void testUploadSizes() throws Exception {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
FsStateChangelogWriter writer = storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE);
FsStateChangelogWriter writer = createWriter(storage);

// upload single byte to infer header size
SequenceNumber from = writer.nextSequenceNumber();
Expand All @@ -108,7 +109,7 @@ public void testUploadFailuresCounter() throws Exception {
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(Path.fromLocalFile(file), false, 100, metrics)) {
FsStateChangelogWriter writer = storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE);
FsStateChangelogWriter writer = createWriter(storage);

int numUploads = 5;
for (int i = 0; i < numUploads; i++) {
Expand Down Expand Up @@ -149,7 +150,9 @@ public void testUploadBatchSizes() throws Exception {
FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher, Integer.MAX_VALUE);
FsStateChangelogWriter[] writers = new FsStateChangelogWriter[numWriters];
for (int i = 0; i < numWriters; i++) {
writers[i] = storage.createWriter(Integer.toString(i), EMPTY_KEY_GROUP_RANGE);
writers[i] =
storage.createWriter(
Integer.toString(i), EMPTY_KEY_GROUP_RANGE, new SyncMailboxExecutor());
}

try {
Expand Down Expand Up @@ -190,7 +193,7 @@ public void testAttemptsPerUpload() throws Exception {
metrics);

FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher, Integer.MAX_VALUE);
FsStateChangelogWriter writer = storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE);
FsStateChangelogWriter writer = createWriter(storage);

try {
for (int upload = 0; upload < numUploads; upload++) {
Expand Down Expand Up @@ -242,7 +245,7 @@ public void testQueueSize() throws Exception {
metrics);
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(batcher, Long.MAX_VALUE)) {
FsStateChangelogWriter writer = storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE);
FsStateChangelogWriter writer = createWriter(storage);
int numUploads = 11;
for (int i = 0; i < numUploads; i++) {
SequenceNumber from = writer.nextSequenceNumber();
Expand Down Expand Up @@ -288,4 +291,8 @@ public void close() {
attemptsPerTask.clear();
}
}

private FsStateChangelogWriter createWriter(FsStateChangelogStorage storage) {
return storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE, new SyncMailboxExecutor());
}
}
Loading

0 comments on commit db4d05a

Please sign in to comment.