Skip to content

Commit

Permalink
[hotfix][kafka] Do not return producers to a pool in abort for non EX…
Browse files Browse the repository at this point in the history
…ACTLY_ONCE modes

Previously on abort(...) producers were returned to the pool. This was minor bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.

This closes apache#4915.
  • Loading branch information
pnowojski authored and tzulitai committed Nov 2, 2017
1 parent 5058c3f commit 51e9a01
Showing 1 changed file with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

Expand Down Expand Up @@ -226,7 +225,7 @@ public enum Semantic {
/**
* Pool of KafkaProducers objects.
*/
private transient ProducersPool producersPool = new ProducersPool();
private transient Optional<ProducersPool> producersPool = Optional.empty();

/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
Expand Down Expand Up @@ -596,7 +595,7 @@ public void close() throws Exception {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
}
try {
producersPool.close();
producersPool.ifPresent(pool -> pool.close());
}
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
Expand Down Expand Up @@ -628,7 +627,7 @@ protected KafkaTransactionState beginTransaction() throws Exception {
}

private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll();
FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
if (producer == null) {
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
Expand Down Expand Up @@ -661,7 +660,7 @@ protected void commit(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
transaction.producer.commitTransaction();
producersPool.add(transaction.producer);
getProducersPool().add(transaction.producer);
break;
case AT_LEAST_ONCE:
case NONE:
Expand Down Expand Up @@ -703,11 +702,10 @@ protected void abort(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
transaction.producer.abortTransaction();
producersPool.add(transaction.producer);
getProducersPool().add(transaction.producer);
break;
case AT_LEAST_ONCE:
case NONE:
producersPool.add(transaction.producer);
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
Expand Down Expand Up @@ -760,7 +758,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
nextTransactionalIdHintState.clear();
// To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
// subtasks would write exactly same information.
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && nextTransactionalIdHint != null) {
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
Expand Down Expand Up @@ -788,7 +787,10 @@ public void initializeState(FunctionInitializationContext context) throws Except

if (semantic != Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
producersPool = Optional.empty();
} else {
producersPool = Optional.of(new ProducersPool());

ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
Expand Down Expand Up @@ -829,8 +831,7 @@ protected Optional<KafkaTransactionContext> initializeUserContext() {
}

private Set<String> generateNewTransactionalIds() {
Preconditions.checkState(nextTransactionalIdHint != null,
"nextTransactionalIdHint must be present for EXACTLY_ONCE");
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE");

// range of available transactional ids is:
// [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
Expand Down Expand Up @@ -903,6 +904,11 @@ private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String tran
return initProducer(registerMetrics);
}

private ProducersPool getProducersPool() {
checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool");
return producersPool.get();
}

private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);

Expand Down Expand Up @@ -958,7 +964,7 @@ private void checkErroneous() throws Exception {

private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
producersPool = new ProducersPool();
producersPool = Optional.empty();
}

private static Properties getPropertiesFromBrokerList(String brokerList) {
Expand Down

0 comments on commit 51e9a01

Please sign in to comment.