Skip to content

Commit

Permalink
[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
GJL authored and tzulitai committed Nov 2, 2017
1 parent 8cdf2ff commit b7d3589
Showing 1 changed file with 1 addition and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
Expand All @@ -36,13 +35,10 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -68,7 +64,7 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
new KeyedSerializationSchemaWrapper(integerSerializationSchema);
new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);

@Before
public void before() {
Expand All @@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}

@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.commitTransaction();
}
assertRecord(topicName, "42", "42");
deleteTestTopic(topicName);
}

@Test(timeout = 30000L)
public void testResumeTransaction() throws IOException {
String topicName = "flink-kafka-producer-resume-transaction";
try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.flush();
long producerId = kafkaProducer.getProducerId();
short epoch = kafkaProducer.getEpoch();

try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
resumeProducer.resumeTransaction(producerId, epoch);
resumeProducer.commitTransaction();
}

assertRecord(topicName, "42", "42");

// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
kafkaProducer.commitTransaction();

// this shouldn't fail also, for same reason as above
try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
resumeProducer.resumeTransaction(producerId, epoch);
resumeProducer.commitTransaction();
}
}
deleteTestTopic(topicName);
}

@Test(timeout = 120_000L)
public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
String topic = "flink-kafka-producer-fail-before-notify";
Expand Down

0 comments on commit b7d3589

Please sign in to comment.