Skip to content

Commit

Permalink
[FLINK-15490][kafka][test-stability] Enable idempotence producing in …
Browse files Browse the repository at this point in the history
…KafkaITCase to avoid intermittent test failure.
  • Loading branch information
becketqin committed Jan 10, 2020
1 parent fb12bcf commit f0f9343
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio

public abstract List<KafkaServer> getBrokers();

public Properties getIdempotentProducerConfig() {
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", "3");
return props;
}

// -- consumer / producer instances:
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
return getConsumer(topics, new KafkaDeserializationSchemaWrapper<T>(deserializationSchema), props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public void cancel() {
if (secureProps != null) {
props.putAll(testServer.getSecureProperties());
}
// Ensure the producer enables idempotence.
props.putAll(testServer.getIdempotentProducerConfig());

stream = stream.rebalance();
testServer.produceIntoKafka(stream, topic,
Expand Down

0 comments on commit f0f9343

Please sign in to comment.