diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 1c06b0ddd4cf5f..a1e347a80a4c8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -90,6 +90,12 @@ public abstract class TwoPhaseCommitSinkFunction extends RichS private final ListStateDescriptor> stateDescriptor; + /** + * Current Transaction Holder, including three states: 1. Normal Transaction: created when a new + * snapshot is taken during normal task running 2. Empty Transaction: created when a new + * snapshot is taken after the task is finished. At this point, there is no need to initiate + * real transactions due to no more input data. 3. null: After task/function is closed. + */ private TransactionHolder currentTransactionHolder; /** Specifies the maximum time a transaction should remain open. */ @@ -107,6 +113,9 @@ public abstract class TwoPhaseCommitSinkFunction extends RichS */ private double transactionTimeoutWarningRatio = -1; + /** Whether this sink function as well as its task is finished. */ + private boolean finished = false; + /** * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities * for using this constructor are {@link TypeInformation#of(Class)}, {@link @@ -230,11 +239,16 @@ public final void invoke(IN value) throws Exception {} @Override public final void invoke(IN value, Context context) throws Exception { - invoke(currentTransactionHolder.handle, value, context); + TXN currentTransaction = currentTransaction(); + checkNotNull( + currentTransaction, + "two phase commit sink function with null transaction should not be invoked! "); + invoke(currentTransaction, value, context); } @Override public final void finish() throws Exception { + finished = true; finishProcessing(currentTransaction()); } @@ -332,11 +346,18 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { context.getCheckpointId(), currentTransactionHolder); - preCommit(currentTransactionHolder.handle); - pendingCommitTransactions.put(checkpointId, currentTransactionHolder); - LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); + if (!currentTransactionHolder.equals(TransactionHolder.empty())) { + preCommit(currentTransactionHolder.handle); + pendingCommitTransactions.put(checkpointId, currentTransactionHolder); + LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); + } - currentTransactionHolder = beginTransactionInternal(); + // no need to start new transactions after sink function is closed (no more input data) + if (!finished) { + currentTransactionHolder = beginTransactionInternal(); + } else { + currentTransactionHolder = TransactionHolder.empty(); + } LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder); state.clear(); @@ -383,12 +404,14 @@ public void initializeState(FunctionInitializationContext context) throws Except { TXN transaction = operatorState.getPendingTransaction().handle; - recoverAndAbort(transaction); - handledTransactions.add(transaction); - LOG.info( - "{} aborted recovered transaction {}", - name(), - operatorState.getPendingTransaction()); + if (transaction != null) { + recoverAndAbort(transaction); + handledTransactions.add(transaction); + LOG.info( + "{} aborted recovered transaction {}", + name(), + operatorState.getPendingTransaction()); + } } if (userContext.isPresent()) { @@ -460,10 +483,12 @@ private void logWarningIfTimeoutAlmostReached(TransactionHolder transaction public void close() throws Exception { super.close(); - if (currentTransactionHolder != null) { - abort(currentTransactionHolder.handle); - currentTransactionHolder = null; + TXN currentTransaction = currentTransaction(); + if (currentTransaction != null) { + abort(currentTransaction); } + + currentTransactionHolder = null; } /** @@ -607,6 +632,8 @@ public static final class TransactionHolder { private final TXN handle; + private static final TransactionHolder EMPTY = new TransactionHolder<>(null, -1); + /** * The system time when {@link #handle} was created. Used to determine if the current * transaction has exceeded its timeout specified by {@link #transactionTimeout}. @@ -623,6 +650,11 @@ long elapsedTime(Clock clock) { return clock.millis() - transactionStartTime; } + @SuppressWarnings("unchecked") + public static TransactionHolder empty() { + return (TransactionHolder) EMPTY; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 898c08bf4d37f9..580b0e014e5362 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -116,6 +116,28 @@ public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception { assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction } + @Test + public void testNoTransactionAfterSinkFunctionFinish() throws Exception { + harness.open(); + harness.processElement("42", 0); + harness.snapshot(0, 1); + harness.processElement("43", 2); + harness.snapshot(1, 3); + harness.processElement("44", 4); + + // do not expect new input after finish() + sinkFunction.finish(); + + harness.snapshot(2, 5); + harness.notifyOfCompletedCheckpoint(1); + + // Checkpoint2 has not complete + assertExactlyOnce(Arrays.asList("42", "43")); + + // transaction for checkpoint2 + assertEquals(1, tmpDirectory.listFiles().size()); + } + @Test public void testNotifyOfCompletedCheckpoint() throws Exception { harness.open();