From a47b276314e24fc9f153bc163d8470ce24e36b2f Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Fri, 8 Feb 2019 15:28:33 +0100 Subject: [PATCH] [hotfix][kafka,test] Synchronize 0.11 KafkaTestEnvironmentImpl with universal This is a pure refactor, that reorders the method so that those two KafkaTestEnvironmentImpl implementations are more inline --- .../kafka/KafkaTestEnvironmentImpl.java | 244 +++++++++--------- 1 file changed, 125 insertions(+), 119 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 160adf5886782..ae237f67dfd35 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -85,15 +85,125 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { // 6 seconds is default. Seems to be too small for travis. 30 seconds private int zkTimeout = 30000; private Config config; - - public String getBrokerConnectionString() { - return brokerConnectionString; - } + private static final int DELETE_TIMEOUT_SECONDS = 30; public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) { this.producerSemantic = producerSemantic; } + @Override + public void prepare(Config config) throws Exception { + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if (config.isSecureMode()) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + config.setKafkaServersNumber(1); + zkTimeout = zkTimeout * 15; + } + this.config = config; + + File tempDir = new File(System.getProperty("java.io.tmpdir")); + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers.clear(); + + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); + + LOG.info("Starting KafkaServer"); + + ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); + brokers.add(kafkaServer); + brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName)); + brokerConnectionString += ","; + } + + LOG.info("ZK and KafkaServer started."); + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("enable.auto.commit", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$); + } finally { + zkUtils.close(); + } + + // validate that the topic has been created + final long deadline = System.nanoTime() + 30_000_000_000L; + do { + try { + if (config.isSecureMode()) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = zkTimeout / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if (AdminUtils.topicExists(checkZKConn, topic)) { + checkZKConn.close(); + return; + } + checkZKConn.close(); + } + while (System.nanoTime() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + @Override public Properties getStandardProperties() { return standardProps; @@ -115,6 +225,11 @@ public Properties getSecureProperties() { return prop; } + @Override + public String getBrokerConnectionString() { + return brokerConnectionString; + } + @Override public String getVersion() { return "0.11"; @@ -183,7 +298,12 @@ public DataStreamSink produceIntoKafka(DataStream stream, String topic @Override public DataStreamSink writeToKafkaWithTimestamps(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props) { FlinkKafkaProducer011 prod = new FlinkKafkaProducer011<>( - topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + topic, + serSchema, + props, + Optional.of(new FlinkFixedPartitioner<>()), + producerSemantic, + FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); prod.setWriteTimestampToKafka(true); @@ -233,60 +353,6 @@ public boolean isSecureRunSupported() { return true; } - @Override - public void prepare(Config config) throws Exception { - //increase the timeout since in Travis ZK connection takes long time for secure connection. - if (config.isSecureMode()) { - //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - config.setKafkaServersNumber(1); - zkTimeout = zkTimeout * 15; - } - this.config = config; - - File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); - assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); - - tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); - assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - - tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < config.getKafkaServersNumber(); i++) { - File tmpDir = new File(tmpKafkaParent, "server-" + i); - assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); - tmpKafkaDirs.add(tmpDir); - } - - zookeeper = null; - brokers.clear(); - - zookeeper = new TestingServer(-1, tmpZkDir); - zookeeperConnectionString = zookeeper.getConnectString(); - LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); - - LOG.info("Starting KafkaServer"); - - ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT); - for (int i = 0; i < config.getKafkaServersNumber(); i++) { - KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); - brokers.add(kafkaServer); - brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName)); - brokerConnectionString += ","; - } - - LOG.info("ZK and KafkaServer started."); - - standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); - standardProps.setProperty("bootstrap.servers", brokerConnectionString); - standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("enable.auto.commit", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); - standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); - standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value) - standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - } - @Override public void shutdown() throws Exception { for (KafkaServer broker : brokers) { @@ -333,65 +399,6 @@ public ZkUtils getZkUtils() { return ZkUtils.apply(creator, false); } - @Override - public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { - // create topic with one client - LOG.info("Creating topic {}", topic); - - ZkUtils zkUtils = getZkUtils(); - try { - AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$); - } finally { - zkUtils.close(); - } - - // validate that the topic has been created - final long deadline = System.nanoTime() + 30_000_000_000L; - do { - try { - if (config.isSecureMode()) { - //increase wait time since in Travis ZK timeout occurs frequently - int wait = zkTimeout / 100; - LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); - Thread.sleep(wait); - } else { - Thread.sleep(100); - } - } catch (InterruptedException e) { - // restore interrupted state - } - // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are - // not always correct. - - // create a new ZK utils connection - ZkUtils checkZKConn = getZkUtils(); - if (AdminUtils.topicExists(checkZKConn, topic)) { - checkZKConn.close(); - return; - } - checkZKConn.close(); - } - while (System.nanoTime() < deadline); - fail("Test topic could not be created"); - } - - @Override - public void deleteTestTopic(String topic) { - ZkUtils zkUtils = getZkUtils(); - try { - LOG.info("Deleting topic {}", topic); - - ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - - AdminUtils.deleteTopic(zkUtils, topic); - - zk.close(); - } finally { - zkUtils.close(); - } - } - /** * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed). */ @@ -486,5 +493,4 @@ public void close() { offsetClient.close(); } } - }