diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index f349df3a8de3d..c27c620f3dd01 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -45,7 +45,6 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -226,7 +225,7 @@ public enum Semantic { /** * Pool of KafkaProducers objects. */ - private transient ProducersPool producersPool = new ProducersPool(); + private transient Optional producersPool = Optional.empty(); /** * Flag controlling whether we are writing the Flink record's timestamp into Kafka. @@ -596,7 +595,7 @@ public void close() throws Exception { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); } try { - producersPool.close(); + producersPool.ifPresent(pool -> pool.close()); } catch (Exception e) { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); @@ -628,7 +627,7 @@ protected KafkaTransactionState beginTransaction() throws Exception { } private FlinkKafkaProducer createOrGetProducerFromPool() throws Exception { - FlinkKafkaProducer producer = producersPool.poll(); + FlinkKafkaProducer producer = getProducersPool().poll(); if (producer == null) { String transactionalId = availableTransactionalIds.poll(); if (transactionalId == null) { @@ -661,7 +660,7 @@ protected void commit(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: transaction.producer.commitTransaction(); - producersPool.add(transaction.producer); + getProducersPool().add(transaction.producer); break; case AT_LEAST_ONCE: case NONE: @@ -703,11 +702,10 @@ protected void abort(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: transaction.producer.abortTransaction(); - producersPool.add(transaction.producer); + getProducersPool().add(transaction.producer); break; case AT_LEAST_ONCE: case NONE: - producersPool.add(transaction.producer); break; default: throw new UnsupportedOperationException("Not implemented semantic"); @@ -760,7 +758,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { nextTransactionalIdHintState.clear(); // To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the // subtasks would write exactly same information. - if (getRuntimeContext().getIndexOfThisSubtask() == 0 && nextTransactionalIdHint != null) { + if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) { + checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE"); long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId; // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that @@ -788,7 +787,10 @@ public void initializeState(FunctionInitializationContext context) throws Except if (semantic != Semantic.EXACTLY_ONCE) { nextTransactionalIdHint = null; + producersPool = Optional.empty(); } else { + producersPool = Optional.of(new ProducersPool()); + ArrayList transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get()); if (transactionalIdHints.size() > 1) { throw new IllegalStateException( @@ -829,8 +831,7 @@ protected Optional initializeUserContext() { } private Set generateNewTransactionalIds() { - Preconditions.checkState(nextTransactionalIdHint != null, - "nextTransactionalIdHint must be present for EXACTLY_ONCE"); + checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE"); // range of available transactional ids is: // [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize) @@ -903,6 +904,11 @@ private FlinkKafkaProducer initTransactionalProducer(String tran return initProducer(registerMetrics); } + private ProducersPool getProducersPool() { + checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool"); + return producersPool.get(); + } + private FlinkKafkaProducer initProducer(boolean registerMetrics) { FlinkKafkaProducer producer = new FlinkKafkaProducer<>(this.producerConfig); @@ -958,7 +964,7 @@ private void checkErroneous() throws Exception { private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - producersPool = new ProducersPool(); + producersPool = Optional.empty(); } private static Properties getPropertiesFromBrokerList(String brokerList) {