Skip to content

Commit

Permalink
[hotfix][kafka,test] Synchronize 0.11 KafkaTestEnvironmentImpl with u…
Browse files Browse the repository at this point in the history
…niversal

This is a pure refactor, that reorders the method so that those two KafkaTestEnvironmentImpl implementations are more inline
  • Loading branch information
pnowojski committed May 7, 2019
1 parent 3b1976a commit a47b276
Showing 1 changed file with 125 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,125 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000;
private Config config;

public String getBrokerConnectionString() {
return brokerConnectionString;
}
private static final int DELETE_TIMEOUT_SECONDS = 30;

public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) {
this.producerSemantic = producerSemantic;
}

@Override
public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15;
}
this.config = config;

File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());

tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());

tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
}

zookeeper = null;
brokers.clear();

zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);

LOG.info("Starting KafkaServer");

ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
brokers.add(kafkaServer);
brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
brokerConnectionString += ",";
}

LOG.info("ZK and KafkaServer started.");

standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
}

@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
// create topic with one client
LOG.info("Creating topic {}", topic);

ZkUtils zkUtils = getZkUtils();
try {
AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
} finally {
zkUtils.close();
}

// validate that the topic has been created
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Thread.sleep(wait);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
// restore interrupted state
}
// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
// not always correct.

// create a new ZK utils connection
ZkUtils checkZKConn = getZkUtils();
if (AdminUtils.topicExists(checkZKConn, topic)) {
checkZKConn.close();
return;
}
checkZKConn.close();
}
while (System.nanoTime() < deadline);
fail("Test topic could not be created");
}

@Override
public void deleteTestTopic(String topic) {
ZkUtils zkUtils = getZkUtils();
try {
LOG.info("Deleting topic {}", topic);

ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());

AdminUtils.deleteTopic(zkUtils, topic);

zk.close();
} finally {
zkUtils.close();
}
}

@Override
public Properties getStandardProperties() {
return standardProps;
Expand All @@ -115,6 +225,11 @@ public Properties getSecureProperties() {
return prop;
}

@Override
public String getBrokerConnectionString() {
return brokerConnectionString;
}

@Override
public String getVersion() {
return "0.11";
Expand Down Expand Up @@ -183,7 +298,12 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
@Override
public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>(
topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
topic,
serSchema,
props,
Optional.of(new FlinkFixedPartitioner<>()),
producerSemantic,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);

prod.setWriteTimestampToKafka(true);

Expand Down Expand Up @@ -233,60 +353,6 @@ public boolean isSecureRunSupported() {
return true;
}

@Override
public void prepare(Config config) throws Exception {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15;
}
this.config = config;

File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());

tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());

tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
}

zookeeper = null;
brokers.clear();

zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);

LOG.info("Starting KafkaServer");

ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
brokers.add(kafkaServer);
brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
brokerConnectionString += ",";
}

LOG.info("ZK and KafkaServer started.");

standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value)
standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
}

@Override
public void shutdown() throws Exception {
for (KafkaServer broker : brokers) {
Expand Down Expand Up @@ -333,65 +399,6 @@ public ZkUtils getZkUtils() {
return ZkUtils.apply(creator, false);
}

@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
// create topic with one client
LOG.info("Creating topic {}", topic);

ZkUtils zkUtils = getZkUtils();
try {
AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
} finally {
zkUtils.close();
}

// validate that the topic has been created
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Thread.sleep(wait);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
// restore interrupted state
}
// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
// not always correct.

// create a new ZK utils connection
ZkUtils checkZKConn = getZkUtils();
if (AdminUtils.topicExists(checkZKConn, topic)) {
checkZKConn.close();
return;
}
checkZKConn.close();
}
while (System.nanoTime() < deadline);
fail("Test topic could not be created");
}

@Override
public void deleteTestTopic(String topic) {
ZkUtils zkUtils = getZkUtils();
try {
LOG.info("Deleting topic {}", topic);

ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());

AdminUtils.deleteTopic(zkUtils, topic);

zk.close();
} finally {
zkUtils.close();
}
}

/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
*/
Expand Down Expand Up @@ -486,5 +493,4 @@ public void close() {
offsetClient.close();
}
}

}

0 comments on commit a47b276

Please sign in to comment.