Skip to content

Commit

Permalink
[hotfix][kafka] Move checkpointing enable checking to initializeState
Browse files Browse the repository at this point in the history
initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.
  • Loading branch information
pnowojski authored and tzulitai committed Nov 2, 2017
1 parent 856b6ba commit 425ffe2
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,6 @@ public FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout() {
*/
@Override
public void open(Configuration configuration) throws Exception {
if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
semantic = Semantic.NONE;
}

if (logFailuresOnly) {
callback = new Callback() {
@Override
Expand Down Expand Up @@ -787,6 +782,11 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
semantic = Semantic.NONE;
}

nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);

Expand Down

0 comments on commit 425ffe2

Please sign in to comment.