diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 6242a208481ca..a69c7309973e0 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -524,11 +524,6 @@ public FlinkKafkaProducer011 ignoreFailuresAfterTransactionTimeout() { */ @Override public void open(Configuration configuration) throws Exception { - if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { - LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE); - semantic = Semantic.NONE; - } - if (logFailuresOnly) { callback = new Callback() { @Override @@ -787,6 +782,11 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { @Override public void initializeState(FunctionInitializationContext context) throws Exception { + if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE); + semantic = Semantic.NONE; + } + nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);