Skip to content

Commit

Permalink
[FLINK-23473][runtime] Do not create transaction in TwoPhaseCommitSin…
Browse files Browse the repository at this point in the history
…kFunction after finish
  • Loading branch information
curcur committed Aug 10, 2021
1 parent d759a5a commit aa88f9f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS

private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;

/**
* Current Transaction Holder, including three states: 1. Normal Transaction: created when a new
* snapshot is taken during normal task running 2. Empty Transaction: created when a new
* snapshot is taken after the task is finished. At this point, there is no need to initiate
* real transactions due to no more input data. 3. null: After task/function is closed.
*/
private TransactionHolder<TXN> currentTransactionHolder;

/** Specifies the maximum time a transaction should remain open. */
Expand All @@ -107,6 +113,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS
*/
private double transactionTimeoutWarningRatio = -1;

/** Whether this sink function as well as its task is finished. */
private boolean finished = false;

/**
* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities
* for using this constructor are {@link TypeInformation#of(Class)}, {@link
Expand Down Expand Up @@ -230,11 +239,16 @@ public final void invoke(IN value) throws Exception {}

@Override
public final void invoke(IN value, Context context) throws Exception {
invoke(currentTransactionHolder.handle, value, context);
TXN currentTransaction = currentTransaction();
checkNotNull(
currentTransaction,
"two phase commit sink function with null transaction should not be invoked! ");
invoke(currentTransaction, value, context);
}

@Override
public final void finish() throws Exception {
finished = true;
finishProcessing(currentTransaction());
}

Expand Down Expand Up @@ -332,11 +346,18 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
context.getCheckpointId(),
currentTransactionHolder);

preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
if (!currentTransactionHolder.equals(TransactionHolder.empty())) {
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
}

currentTransactionHolder = beginTransactionInternal();
// no need to start new transactions after sink function is closed (no more input data)
if (!finished) {
currentTransactionHolder = beginTransactionInternal();
} else {
currentTransactionHolder = TransactionHolder.empty();
}
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

state.clear();
Expand Down Expand Up @@ -383,12 +404,14 @@ public void initializeState(FunctionInitializationContext context) throws Except

{
TXN transaction = operatorState.getPendingTransaction().handle;
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info(
"{} aborted recovered transaction {}",
name(),
operatorState.getPendingTransaction());
if (transaction != null) {
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info(
"{} aborted recovered transaction {}",
name(),
operatorState.getPendingTransaction());
}
}

if (userContext.isPresent()) {
Expand Down Expand Up @@ -460,10 +483,12 @@ private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transaction
public void close() throws Exception {
super.close();

if (currentTransactionHolder != null) {
abort(currentTransactionHolder.handle);
currentTransactionHolder = null;
TXN currentTransaction = currentTransaction();
if (currentTransaction != null) {
abort(currentTransaction);
}

currentTransactionHolder = null;
}

/**
Expand Down Expand Up @@ -607,6 +632,8 @@ public static final class TransactionHolder<TXN> {

private final TXN handle;

private static final TransactionHolder<?> EMPTY = new TransactionHolder<>(null, -1);

/**
* The system time when {@link #handle} was created. Used to determine if the current
* transaction has exceeded its timeout specified by {@link #transactionTimeout}.
Expand All @@ -623,6 +650,11 @@ long elapsedTime(Clock clock) {
return clock.millis() - transactionStartTime;
}

@SuppressWarnings("unchecked")
public static <TXN> TransactionHolder<TXN> empty() {
return (TransactionHolder<TXN>) EMPTY;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction
}

@Test
public void testNoTransactionAfterSinkFunctionFinish() throws Exception {
harness.open();
harness.processElement("42", 0);
harness.snapshot(0, 1);
harness.processElement("43", 2);
harness.snapshot(1, 3);
harness.processElement("44", 4);

// do not expect new input after finish()
sinkFunction.finish();

harness.snapshot(2, 5);
harness.notifyOfCompletedCheckpoint(1);

// Checkpoint2 has not complete
assertExactlyOnce(Arrays.asList("42", "43"));

// transaction for checkpoint2
assertEquals(1, tmpDirectory.listFiles().size());
}

@Test
public void testNotifyOfCompletedCheckpoint() throws Exception {
harness.open();
Expand Down

0 comments on commit aa88f9f

Please sign in to comment.