Skip to content

Commit

Permalink
[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each…
Browse files Browse the repository at this point in the history
… checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44
5. crash....
6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1.
  • Loading branch information
pnowojski authored and aljoscha committed Nov 23, 2017
1 parent 2ac32c5 commit f214e7d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -223,15 +222,10 @@ public enum Semantic {
private final int kafkaProducersPoolSize;

/**
* Available transactional ids.
* Pool of available transactional ids.
*/
private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();

/**
* Pool of KafkaProducers objects.
*/
private transient Optional<ProducersPool> producersPool = Optional.empty();

/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
*/
Expand Down Expand Up @@ -599,12 +593,6 @@ public void close() throws FlinkKafka011Exception {
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
}
try {
producersPool.ifPresent(pool -> pool.close());
}
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
}
// make sure we propagate pending errors
checkErroneous();
}
Expand All @@ -615,7 +603,7 @@ public void close() throws FlinkKafka011Exception {
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
Expand All @@ -631,21 +619,6 @@ protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception
}
}

private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
if (producer == null) {
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
throw new FlinkKafka011Exception(
FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}
producer = initTransactionalProducer(transactionalId, true);
producer.initTransactions();
}
return producer;
}

@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
Expand All @@ -666,7 +639,7 @@ protected void commit(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
transaction.producer.commitTransaction();
getProducersPool().add(transaction.producer);
recycleTransactionalProducer(transaction.producer);
break;
case AT_LEAST_ONCE:
case NONE:
Expand Down Expand Up @@ -706,7 +679,7 @@ protected void abort(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
transaction.producer.abortTransaction();
getProducersPool().add(transaction.producer);
recycleTransactionalProducer(transaction.producer);
break;
case AT_LEAST_ONCE:
case NONE:
Expand Down Expand Up @@ -796,10 +769,7 @@ public void initializeState(FunctionInitializationContext context) throws Except

if (semantic != Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
producersPool = Optional.empty();
} else {
producersPool = Optional.of(new ProducersPool());

ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
Expand Down Expand Up @@ -883,16 +853,33 @@ int getTransactionCoordinatorId() {
return currentTransaction.producer.getTransactionCoordinatorId();
}

/**
* For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
* with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
* obtain new producerId and epoch counters).
*/
private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
throw new FlinkKafka011Exception(
FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}
FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
producer.initTransactions();
return producer;
}

private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.close();
}

private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
producerConfig.put("transactional.id", transactionalId);
return initProducer(registerMetrics);
}

private ProducersPool getProducersPool() {
checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool");
return producersPool.get();
}

private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);

Expand Down Expand Up @@ -951,7 +938,6 @@ private void checkErroneous() throws FlinkKafka011Exception {

private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
producersPool = Optional.empty();
}

private static Properties getPropertiesFromBrokerList(String brokerList) {
Expand Down Expand Up @@ -1264,25 +1250,6 @@ public boolean canEqual(Object obj) {
}
}

static class ProducersPool implements Closeable {
private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>();

public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
pool.add(producer);
}

public FlinkKafkaProducer<byte[], byte[]> poll() {
return pool.poll();
}

@Override
public void close() {
while (!pool.isEmpty()) {
pool.poll().close();
}
}
}

/**
* Keep information required to deduce next safe to use transactional id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

/**
* IT cases for the {@link FlinkKafkaProducer011}.
Expand Down Expand Up @@ -79,6 +80,49 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}

/**
* This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
* with previous transactions using same transactional.ids.
*/
@Test(timeout = 120_000L)
public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exception {
String topic = "flink-kafka-producer-fail-before-notify";

try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = createTestHarness(topic)) {
testHarness1.setup();
testHarness1.open();
testHarness1.processElement(42, 0);
OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
testHarness1.processElement(43, 0);
testHarness1.notifyOfCompletedCheckpoint(0);
try {
for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
testHarness1.snapshot(i + 1, 0);
testHarness1.processElement(i, 0);
}
throw new IllegalStateException("This should not be reached.");
}
catch (Exception ex) {
assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex);
}

// Resume transactions before testHrness1 is being closed (in case of failures close() might not be called)
try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
testHarness2.setup();
// restore from snapshot1, transactions with records 43 and 44 should be aborted
testHarness2.initializeState(snapshot);
testHarness2.open();
}

assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
deleteTestTopic(topic);
}
catch (Exception ex) {
// testHarness1 will be fenced off after creating and closing testHarness2
assertIsCausedBy(ProducerFencedException.class, ex);
}
}

@Test(timeout = 120_000L)
public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
String topic = "flink-kafka-producer-fail-before-notify";
Expand Down Expand Up @@ -563,4 +607,25 @@ private Properties createProperties() {
properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
return properties;
}

private void assertIsCausedBy(Class<?> clazz, Throwable ex) {
for (int depth = 0; depth < 50 && ex != null; depth++) {
if (clazz.isInstance(ex)) {
return;
}
ex = ex.getCause();
}
fail(String.format("Exception [%s] was not caused by [%s]", ex, clazz));
}

private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) {
for (int depth = 0; depth < 50 && ex != null; depth++) {
if (ex instanceof FlinkKafka011Exception) {
assertEquals(expectedErrorCode, ((FlinkKafka011Exception) ex).getErrorCode());
return;
}
ex = ex.getCause();
}
fail(String.format("Exception [%s] was not caused by FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode));
}
}

0 comments on commit f214e7d

Please sign in to comment.