Skip to content

Commit

Permalink
fixup! [FLINK-23473][runtime] Do not create transaction in TwoPhaseCo…
Browse files Browse the repository at this point in the history
…mmitSinkFunction after finish
  • Loading branch information
curcur committed Aug 13, 2021
1 parent 4d0feaf commit e817803
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public final void invoke(IN value, Context context) throws Exception {
TXN currentTransaction = currentTransaction();
checkNotNull(
currentTransaction,
"two phase commit sink function with null transaction should not be invoked! ");
"Two phase commit sink function with null transaction should not be invoked! ");
invoke(currentTransaction, value, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ public void testNoTransactionAfterSinkFunctionFinish() throws Exception {
harness.snapshot(2, 5);
harness.notifyOfCompletedCheckpoint(1);

// make sure the previous empty transaction will not be pre-committed
harness.snapshot(3, 6);

try {
harness.processElement("45", 7);
fail("processElement should fail after finish!");
} catch (NullPointerException e) {
}

// Checkpoint2 has not complete
assertExactlyOnce(Arrays.asList("42", "43"));

Expand Down

0 comments on commit e817803

Please sign in to comment.