From f93346c93c6a841cdce576d48a0b5ca8076cc195 Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Tue, 17 Mar 2020 12:26:50 +0800 Subject: [PATCH] [FLINK-16125][connecotr/kafka] Remove Kafka connector property zookeeper.connect and clear documentation because Kafka 0.8 connector has been removed. --- docs/dev/table/connect.md | 9 --------- docs/dev/table/connect.zh.md | 9 --------- docs/dev/table/hive/hive_catalog.md | 2 -- docs/dev/table/hive/hive_catalog.zh.md | 2 -- docs/dev/table/sqlClient.md | 2 -- docs/dev/table/sqlClient.zh.md | 2 -- .../kafka/KafkaTestEnvironmentImpl.java | 1 - .../kafka/KafkaTestEnvironmentImpl.java | 3 +-- .../flink/table/descriptors/KafkaValidator.java | 8 ++------ .../connectors/kafka/KafkaConsumerTestBase.java | 1 - .../KafkaTableSourceSinkFactoryTestBase.java | 15 ++++----------- .../connectors/kafka/KafkaTableTestBase.java | 3 --- .../kafka/KafkaTestEnvironmentImpl.java | 1 - .../registry/test/TestAvroConsumerConfluent.java | 4 +--- .../tests/util/kafka/StreamingKafkaITCase.java | 1 - .../test/resources/kafka_json_source_schema.yaml | 3 --- .../kafka/test/base/KafkaExampleUtil.java | 4 ++-- .../flink/streaming/kafka/test/KafkaExample.java | 2 +- .../streaming/kafka/test/Kafka010Example.java | 2 +- .../streaming/kafka/test/Kafka011Example.java | 2 +- .../test-scripts/kafka_sql_common.sh | 1 - .../test_confluent_schema_registry.sh | 2 +- flink-python/pyflink/table/table_environment.py | 1 - .../pyflink/table/tests/test_descriptor.py | 4 +--- .../table/api/java/StreamTableEnvironment.java | 1 - .../apache/flink/table/api/TableEnvironment.java | 1 - .../table/api/scala/StreamTableEnvironment.scala | 1 - 27 files changed, 15 insertions(+), 72 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 204c4cb882e7e..2bb80174d3d16 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -159,7 +159,6 @@ CREATE TABLE MyUserTable ( 'connector.version' = '0.10', 'connector.topic' = 'topic_name', 'connector.startup-mode' = 'earliest-offset', - 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', -- declare a format for this system @@ -177,7 +176,6 @@ tableEnvironment .version("0.10") .topic("test-input") .startFromEarliest() - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) @@ -211,7 +209,6 @@ table_environment \ .version("0.10") .topic("test-input") .start_from_earliest() - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) \ .with_format( # declare a format for this system @@ -246,7 +243,6 @@ tables: topic: test-input startup-mode: earliest-offset properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 # declare a format for this system @@ -773,8 +769,6 @@ CREATE TABLE MyUserTable ( 'connector.topic' = 'topic_name', -- required: topic name from which the table is read - -- required: specify the ZooKeeper connection string - 'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the Kafka server connection string 'connector.properties.bootstrap.servers' = 'localhost:9092', -- required for Kafka source, optional for Kafka sink, specify consumer group @@ -814,7 +808,6 @@ CREATE TABLE MyUserTable ( .topic("...") // required: topic name from which the table is read // optional: connector specific properties - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") @@ -844,7 +837,6 @@ CREATE TABLE MyUserTable ( .topic("...") # required: topic name from which the table is read # optional: connector specific properties - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") @@ -874,7 +866,6 @@ connector: topic: ... # required: topic name from which the table is read properties: - zookeeper.connect: localhost:2181 # required: specify the ZooKeeper connection string bootstrap.servers: localhost:9092 # required: specify the Kafka server connection string group.id: testGroup # optional: required in Kafka consumer, specify consumer group diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md index 66b8d9a74fef2..720ab54d24823 100644 --- a/docs/dev/table/connect.zh.md +++ b/docs/dev/table/connect.zh.md @@ -159,7 +159,6 @@ CREATE TABLE MyUserTable ( 'connector.version' = '0.10', 'connector.topic' = 'topic_name', 'connector.startup-mode' = 'earliest-offset', - 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', -- declare a format for this system @@ -177,7 +176,6 @@ tableEnvironment .version("0.10") .topic("test-input") .startFromEarliest() - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) @@ -211,7 +209,6 @@ table_environment \ .version("0.10") .topic("test-input") .start_from_earliest() - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) \ .with_format( # declare a format for this system @@ -246,7 +243,6 @@ tables: topic: test-input startup-mode: earliest-offset properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 # declare a format for this system @@ -773,8 +769,6 @@ CREATE TABLE MyUserTable ( 'connector.topic' = 'topic_name', -- required: topic name from which the table is read - -- required: specify the ZooKeeper connection string - 'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the Kafka server connection string 'connector.properties.bootstrap.servers' = 'localhost:9092', -- required for Kafka source, optional for Kafka sink, specify consumer group @@ -814,7 +808,6 @@ CREATE TABLE MyUserTable ( .topic("...") // required: topic name from which the table is read // optional: connector specific properties - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") @@ -844,7 +837,6 @@ CREATE TABLE MyUserTable ( .topic("...") # required: topic name from which the table is read # optional: connector specific properties - .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") @@ -874,7 +866,6 @@ connector: topic: ... # required: topic name from which the table is read properties: - zookeeper.connect: localhost:2181 # required: specify the ZooKeeper connection string bootstrap.servers: localhost:9092 # required: specify the Kafka server connection string group.id: testGroup # optional: required in Kafka consumer, specify consumer group diff --git a/docs/dev/table/hive/hive_catalog.md b/docs/dev/table/hive/hive_catalog.md index d2e7d5a652563..d907703fe24b7 100644 --- a/docs/dev/table/hive/hive_catalog.md +++ b/docs/dev/table/hive/hive_catalog.md @@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test', - 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'csv', 'update-mode' = 'append' @@ -227,7 +226,6 @@ Location: ...... Table Type: MANAGED_TABLE Table Parameters: flink.connector.properties.bootstrap.servers localhost:9092 - flink.connector.properties.zookeeper.connect localhost:2181 flink.connector.topic test flink.connector.type kafka flink.connector.version universal diff --git a/docs/dev/table/hive/hive_catalog.zh.md b/docs/dev/table/hive/hive_catalog.zh.md index d2e7d5a652563..d907703fe24b7 100644 --- a/docs/dev/table/hive/hive_catalog.zh.md +++ b/docs/dev/table/hive/hive_catalog.zh.md @@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test', - 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'csv', 'update-mode' = 'append' @@ -227,7 +226,6 @@ Location: ...... Table Type: MANAGED_TABLE Table Parameters: flink.connector.properties.bootstrap.servers localhost:9092 - flink.connector.properties.zookeeper.connect localhost:2181 flink.connector.topic test flink.connector.type kafka flink.connector.version universal diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index ed4ff80f9f561..9ea1bb897a68c 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -317,7 +317,6 @@ tables: topic: TaxiRides startup-mode: earliest-offset properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: @@ -483,7 +482,6 @@ tables: version: "0.11" topic: OutputTopic properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md index 93d506b8acb19..642f44810ec5d 100644 --- a/docs/dev/table/sqlClient.zh.md +++ b/docs/dev/table/sqlClient.zh.md @@ -317,7 +317,6 @@ tables: topic: TaxiRides startup-mode: earliest-offset properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: @@ -483,7 +482,6 @@ tables: version: "0.11" topic: OutputTopic properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 64649eee6ae07..322c3aa284a61 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -253,7 +253,6 @@ public void prepare(Config config) throws Exception { LOG.info("ZK and KafkaServer started."); standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("enable.auto.commit", "false"); diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index a3982badaec69..478ce38886736 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -134,7 +134,6 @@ public void prepare(Config config) throws Exception { LOG.info("ZK and KafkaServer started."); standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("enable.auto.commit", "false"); @@ -393,8 +392,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except kafkaProperties.put("advertised.host.name", KAFKA_HOST); kafkaProperties.put("broker.id", Integer.toString(brokerId)); kafkaProperties.put("log.dir", tmpFolder.toString()); - kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index f55bc1e923066..158417a30003e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -54,7 +54,6 @@ public class KafkaValidator extends ConnectorDescriptorValidator { public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset"; public static final String CONNECTOR_STARTUP_TIMESTAMP_MILLIS = "connector.startup-timestamp-millis"; public static final String CONNECTOR_PROPERTIES = "connector.properties"; - public static final String CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT = "connector.properties.zookeeper.connect"; public static final String CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER = "connector.properties.bootstrap.servers"; public static final String CONNECTOR_PROPERTIES_GROUP_ID = "connector.properties.group.id"; public static final String CONNECTOR_PROPERTIES_KEY = "key"; @@ -136,11 +135,9 @@ private void validateStartupMode(DescriptorProperties properties) { } private void validateKafkaProperties(DescriptorProperties properties) { - if (properties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) - || properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) + if (properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) || properties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) { - properties.validateString(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT, false); properties.validateString(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER, false); properties.validateString(CONNECTOR_PROPERTIES_GROUP_ID, true); @@ -235,8 +232,7 @@ public static Map validateAndParseSpecificOffsetsString(Descripto } public static boolean hasConciseKafkaProperties(DescriptorProperties descriptorProperties) { - return descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) || - descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) || + return descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) || descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 2cf999ba86720..b788fb88eb3bf 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -169,7 +169,6 @@ public void runFailOnNoBrokerTest() throws Exception { // use wrong ports for the consumers properties.setProperty("bootstrap.servers", "localhost:80"); - properties.setProperty("zookeeper.connect", "localhost:80"); properties.setProperty("group.id", "test"); properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast properties.setProperty("socket.timeout.ms", "3000"); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index 0875c28d4e079..d8eb0112251ea 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -93,7 +93,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { private static final Properties KAFKA_PROPERTIES = new Properties(); static { - KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy"); KAFKA_PROPERTIES.setProperty("group.id", "dummy"); KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy"); } @@ -224,7 +223,6 @@ public void testTableSourceWithLegacyProperties() { // use legacy properties legacyPropertiesMap.remove("connector.specific-offsets"); - legacyPropertiesMap.remove("connector.properties.zookeeper.connect"); legacyPropertiesMap.remove("connector.properties.bootstrap.servers"); legacyPropertiesMap.remove("connector.properties.group.id"); @@ -236,12 +234,10 @@ public void testTableSourceWithLegacyProperties() { legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100"); legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1"); legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123"); - legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect"); + legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers"); legacyPropertiesMap.put("connector.properties.0.value", "dummy"); - legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers"); + legacyPropertiesMap.put("connector.properties.1.key", "group.id"); legacyPropertiesMap.put("connector.properties.1.value", "dummy"); - legacyPropertiesMap.put("connector.properties.2.key", "group.id"); - legacyPropertiesMap.put("connector.properties.2.value", "dummy"); final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap) .createStreamTableSource(legacyPropertiesMap); @@ -330,7 +326,6 @@ public void testTableSinkWithLegacyProperties() { // use legacy properties legacyPropertiesMap.remove("connector.specific-offsets"); - legacyPropertiesMap.remove("connector.properties.zookeeper.connect"); legacyPropertiesMap.remove("connector.properties.bootstrap.servers"); legacyPropertiesMap.remove("connector.properties.group.id"); @@ -342,12 +337,10 @@ public void testTableSinkWithLegacyProperties() { legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100"); legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1"); legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123"); - legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect"); + legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers"); legacyPropertiesMap.put("connector.properties.0.value", "dummy"); - legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers"); + legacyPropertiesMap.put("connector.properties.1.key", "group.id"); legacyPropertiesMap.put("connector.properties.1.value", "dummy"); - legacyPropertiesMap.put("connector.properties.2.key", "group.id"); - legacyPropertiesMap.put("connector.properties.2.value", "dummy"); final TableSink actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap) .createStreamTableSink(legacyPropertiesMap); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java index bd526e9f802c6..a3d0acd912334 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.java @@ -66,7 +66,6 @@ public void testKafkaSourceSink() throws Exception { // ---------- Produce an event time stream into Kafka ------------------- String groupId = standardProps.getProperty("group.id"); - String zk = standardProps.getProperty("zookeeper.connect"); String bootstraps = standardProps.getProperty("bootstrap.servers"); // TODO: use DDL to register Kafka once FLINK-15282 is fixed. @@ -83,7 +82,6 @@ public void testKafkaSourceSink() throws Exception { properties.put("connector.type", "kafka"); properties.put("connector.topic", topic); properties.put("connector.version", kafkaVersion()); - properties.put("connector.properties.zookeeper.connect", zk); properties.put("connector.properties.bootstrap.servers", bootstraps); properties.put("connector.properties.group.id", groupId); properties.put("connector.startup-mode", "earliest-offset"); @@ -112,7 +110,6 @@ public void testKafkaSourceSink() throws Exception { // " 'connector.type' = 'kafka',\n" + // " 'connector.topic' = '" + topic + "',\n" + // " 'connector.version' = 'universal',\n" + -// " 'connector.properties.zookeeper.connect' = '" + zk + "',\n" + // " 'connector.properties.bootstrap.servers' = '" + bootstraps + "',\n" + // " 'connector.properties.group.id' = '" + groupId + "', \n" + // " 'connector.startup-mode' = 'earliest-offset', \n" + diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 15b159457b853..16cb72401142c 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -137,7 +137,6 @@ public void prepare(Config config) throws Exception { LOG.info("ZK and KafkaServer started."); standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("enable.auto.commit", "false"); diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java index 55549dee972fb..dda461797392f 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java @@ -37,7 +37,7 @@ * A simple example that shows how to read from and write to Kafka with Confluent Schema Registry. * This will read AVRO messages from the input topic, parse them into a POJO type via checking the Schema by calling Schema registry. * Then this example publish the POJO type to kafka by converting the POJO to AVRO and verifying the schema. - * --input-topic test-input --output-string-topic test-output --output-avro-topic test-avro-output --output-subject --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url http://localhost:8081 --group.id myconsumer + * --input-topic test-input --output-string-topic test-output --output-avro-topic test-avro-output --output-subject --bootstrap.servers localhost:9092 --schema-registry-url http://localhost:8081 --group.id myconsumer */ public class TestAvroConsumerConfluent { @@ -49,14 +49,12 @@ public static void main(String[] args) throws Exception { System.out.println("Missing parameters!\n" + "Usage: Kafka --input-topic --output-string-topic --output-avro-topic " + "--bootstrap.servers " + - "--zookeeper.connect " + "--schema-registry-url --group.id "); return; } Properties config = new Properties(); config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers")); config.setProperty("group.id", parameterTool.getRequired("group.id")); - config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect")); String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java index a7ffb14f09cfe..22d8506f58a7b 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java @@ -101,7 +101,6 @@ public void testKafka() throws Exception { .addArgument("--output-topic", outputTopic) .addArgument("--prefix", "PREFIX") .addArgument("--bootstrap.servers", kafka.getBootstrapServerAddresses().stream().map(address -> address.getHostString() + ':' + address.getPort()).collect(Collectors.joining(","))) - .addArgument("--zookeeper.connect ", kafka.getZookeeperAddress().getHostString() + ':' + kafka.getZookeeperAddress().getPort()) .addArgument("--group.id", "myconsumer") .addArgument("--auto.offset.reset", "earliest") .addArgument("--transaction.timeout.ms", "900000") diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml index 6de0c71319c0b..600e3f1c4a3f8 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_json_source_schema.yaml @@ -40,7 +40,6 @@ tables: topic: $TOPIC_NAME startup-mode: earliest-offset properties: - zookeeper.connect: $KAFKA_ZOOKEEPER_ADDRESS bootstrap.servers: $KAFKA_BOOTSTRAP_SERVERS format: type: json @@ -86,8 +85,6 @@ tables: topic: test-avro startup-mode: earliest-offset properties: - - key: zookeeper.connect - value: $KAFKA_ZOOKEEPER_ADDRESS - key: bootstrap.servers value: $KAFKA_BOOTSTRAP_SERVERS format: diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java index 0e3c4eafca89b..5505a7ad497b1 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java +++ b/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java @@ -34,11 +34,11 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param System.out.println("Missing parameters!\n" + "Usage: Kafka --input-topic --output-topic " + "--bootstrap.servers " + - "--zookeeper.connect --group.id "); + "--group.id "); throw new Exception("Missing parameters!\n" + "Usage: Kafka --input-topic --output-topic " + "--bootstrap.servers " + - "--zookeeper.connect --group.id "); + "--group.id "); } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java index 3a3be93208c96..f3c844c6e3f7d 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java +++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java @@ -40,7 +40,7 @@ * *

Example usage: * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 - * --zookeeper.connect localhost:2181 --group.id myconsumer + * --group.id myconsumer */ public class KafkaExample extends KafkaExampleUtil { diff --git a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java index 0b97179425861..14c9493a21560 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java +++ b/flink-end-to-end-tests/flink-streaming-kafka010-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka010Example.java @@ -38,7 +38,7 @@ * the String messages are of formatted as a (word,frequency,timestamp) tuple. * *

Example usage: - * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer + * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 localhost:2181 --group.id myconsumer */ public class Kafka010Example { diff --git a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java index 1f877c5a5b840..fafd3076c69f8 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java +++ b/flink-end-to-end-tests/flink-streaming-kafka011-test/src/main/java/org/apache/flink/streaming/kafka/test/Kafka011Example.java @@ -38,7 +38,7 @@ * the String messages are of formatted as a (word,frequency,timestamp) tuple. * *

Example usage: - * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer + * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --group.id myconsumer */ public class Kafka011Example { diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh index 560e43e078c61..c7ed12ae4e27b 100644 --- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh +++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh @@ -68,7 +68,6 @@ function get_kafka_json_source_schema { topic: $topicName startup-mode: earliest-offset properties: - zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 format: type: json diff --git a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh index 5d3e9e4970eba..a023f3989d102 100755 --- a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh +++ b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh @@ -78,7 +78,7 @@ create_kafka_topic 1 1 test-avro-out # Read Avro message from [test-avro-input], check the schema and send message to [test-string-ou] $FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ --input-topic test-avro-input --output-string-topic test-string-out --output-avro-topic test-avro-out --output-subject test-output-subject \ - --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \ + --bootstrap.servers localhost:9092 --group.id myconsumer --auto.offset.reset earliest \ --schema-registry-url ${SCHEMA_REGISTRY_URL} #echo "Reading messages from Kafka topic [test-string-ou] ..." diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 46aa70f4eaca1..7acdaa03f98ab 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -522,7 +522,6 @@ def sql_update(self, stmt): ... 'connector.type' = 'kafka', ... 'update-mode' = 'append', ... 'connector.topic' = 'xxx', - ... 'connector.properties.zookeeper.connect' = 'localhost:2181', ... 'connector.properties.bootstrap.servers' = 'localhost:9092' ... ) ... ''' diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py index 002f499866e42..6894e10c62427 100644 --- a/flink-python/pyflink/table/tests/test_descriptor.py +++ b/flink-python/pyflink/table/tests/test_descriptor.py @@ -61,12 +61,10 @@ def test_topic(self): self.assertEqual(expected, properties) def test_properties(self): - kafka = Kafka().properties({"zookeeper.connect": "localhost:2181", - "bootstrap.servers": "localhost:9092"}) + kafka = Kafka().properties({"bootstrap.servers": "localhost:9092"}) properties = kafka.to_properties() expected = {'connector.type': 'kafka', - 'connector.properties.zookeeper.connect': 'localhost:2181', 'connector.properties.bootstrap.servers': 'localhost:9092', 'connector.property-version': '1'} self.assertEqual(expected, properties) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java index 364e619148850..74f0b654f2134 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java @@ -701,7 +701,6 @@ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnviron * new Kafka() * .version("0.11") * .topic("clicks") - * .property("zookeeper.connect", "localhost") * .property("group.id", "click-group") * .startFromEarliest()) * .withFormat( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index f086dd8f88c59..c89d76aa6c536 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -689,7 +689,6 @@ static TableEnvironment create(EnvironmentSettings settings) { * 'connector.type' = 'kafka', * 'update-mode' = 'append', * 'connector.topic' = 'xxx', - * 'connector.properties.zookeeper.connect' = 'localhost:2181', * 'connector.properties.bootstrap.servers' = 'localhost:9092', * ... * )"; diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 54d0c97e8087e..d569c8308a633 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -458,7 +458,6 @@ trait StreamTableEnvironment extends TableEnvironment { * new Kafka() * .version("0.11") * .topic("clicks") - * .property("zookeeper.connect", "localhost") * .property("group.id", "click-group") * .startFromEarliest()) * .withFormat(