Skip to content

Commit

Permalink
[FLINK-16125][connecotr/kafka] Remove Kafka connector property zookee…
Browse files Browse the repository at this point in the history
…per.connect and clear documentation because Kafka 0.8 connector has been removed.
  • Loading branch information
PatrickRen authored and becketqin committed Apr 2, 2020
1 parent d6d7158 commit f93346c
Show file tree
Hide file tree
Showing 27 changed files with 15 additions and 72 deletions.
9 changes: 0 additions & 9 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',

-- declare a format for this system
Expand All @@ -177,7 +176,6 @@ tableEnvironment
.version("0.10")
.topic("test-input")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)

Expand Down Expand Up @@ -211,7 +209,6 @@ table_environment \
.version("0.10")
.topic("test-input")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
Expand Down Expand Up @@ -246,7 +243,6 @@ tables:
topic: test-input
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092

# declare a format for this system
Expand Down Expand Up @@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (

'connector.topic' = 'topic_name', -- required: topic name from which the table is read

-- required: specify the ZooKeeper connection string
'connector.properties.zookeeper.connect' = 'localhost:2181',
-- required: specify the Kafka server connection string
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- required for Kafka source, optional for Kafka sink, specify consumer group
Expand Down Expand Up @@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
.topic("...") // required: topic name from which the table is read

// optional: connector specific properties
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")

Expand Down Expand Up @@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
.topic("...") # required: topic name from which the table is read

# optional: connector specific properties
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")

Expand Down Expand Up @@ -874,7 +866,6 @@ connector:
topic: ... # required: topic name from which the table is read

properties:
zookeeper.connect: localhost:2181 # required: specify the ZooKeeper connection string
bootstrap.servers: localhost:9092 # required: specify the Kafka server connection string
group.id: testGroup # optional: required in Kafka consumer, specify consumer group

Expand Down
9 changes: 0 additions & 9 deletions docs/dev/table/connect.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ CREATE TABLE MyUserTable (
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',

-- declare a format for this system
Expand All @@ -177,7 +176,6 @@ tableEnvironment
.version("0.10")
.topic("test-input")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)

Expand Down Expand Up @@ -211,7 +209,6 @@ table_environment \
.version("0.10")
.topic("test-input")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
Expand Down Expand Up @@ -246,7 +243,6 @@ tables:
topic: test-input
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092

# declare a format for this system
Expand Down Expand Up @@ -773,8 +769,6 @@ CREATE TABLE MyUserTable (

'connector.topic' = 'topic_name', -- required: topic name from which the table is read

-- required: specify the ZooKeeper connection string
'connector.properties.zookeeper.connect' = 'localhost:2181',
-- required: specify the Kafka server connection string
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- required for Kafka source, optional for Kafka sink, specify consumer group
Expand Down Expand Up @@ -814,7 +808,6 @@ CREATE TABLE MyUserTable (
.topic("...") // required: topic name from which the table is read

// optional: connector specific properties
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")

Expand Down Expand Up @@ -844,7 +837,6 @@ CREATE TABLE MyUserTable (
.topic("...") # required: topic name from which the table is read

# optional: connector specific properties
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")

Expand Down Expand Up @@ -874,7 +866,6 @@ connector:
topic: ... # required: topic name from which the table is read

properties:
zookeeper.connect: localhost:2181 # required: specify the ZooKeeper connection string
bootstrap.servers: localhost:9092 # required: specify the Kafka server connection string
group.id: testGroup # optional: required in Kafka consumer, specify consumer group

Expand Down
2 changes: 0 additions & 2 deletions docs/dev/table/hive/hive_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
Expand Down Expand Up @@ -227,7 +226,6 @@ Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
Expand Down
2 changes: 0 additions & 2 deletions docs/dev/table/hive/hive_catalog.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
Expand Down Expand Up @@ -227,7 +226,6 @@ Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
Expand Down
2 changes: 0 additions & 2 deletions docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ tables:
topic: TaxiRides
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
Expand Down Expand Up @@ -483,7 +482,6 @@ tables:
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
Expand Down
2 changes: 0 additions & 2 deletions docs/dev/table/sqlClient.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ tables:
topic: TaxiRides
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
Expand Down Expand Up @@ -483,7 +482,6 @@ tables:
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ public void prepare(Config config) throws Exception {
LOG.info("ZK and KafkaServer started.");

standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public void prepare(Config config) throws Exception {
LOG.info("ZK and KafkaServer started.");

standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
Expand Down Expand Up @@ -393,8 +392,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
kafkaProperties.put("advertised.host.name", KAFKA_HOST);
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
public static final String CONNECTOR_STARTUP_TIMESTAMP_MILLIS = "connector.startup-timestamp-millis";
public static final String CONNECTOR_PROPERTIES = "connector.properties";
public static final String CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT = "connector.properties.zookeeper.connect";
public static final String CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER = "connector.properties.bootstrap.servers";
public static final String CONNECTOR_PROPERTIES_GROUP_ID = "connector.properties.group.id";
public static final String CONNECTOR_PROPERTIES_KEY = "key";
Expand Down Expand Up @@ -136,11 +135,9 @@ private void validateStartupMode(DescriptorProperties properties) {
}

private void validateKafkaProperties(DescriptorProperties properties) {
if (properties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT)
|| properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
if (properties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
|| properties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) {

properties.validateString(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT, false);
properties.validateString(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER, false);
properties.validateString(CONNECTOR_PROPERTIES_GROUP_ID, true);

Expand Down Expand Up @@ -235,8 +232,7 @@ public static Map<Integer, Long> validateAndParseSpecificOffsetsString(Descripto
}

public static boolean hasConciseKafkaProperties(DescriptorProperties descriptorProperties) {
return descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) ||
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
return descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) ||
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ public void runFailOnNoBrokerTest() throws Exception {

// use wrong ports for the consumers
properties.setProperty("bootstrap.servers", "localhost:80");
properties.setProperty("zookeeper.connect", "localhost:80");
properties.setProperty("group.id", "test");
properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast
properties.setProperty("socket.timeout.ms", "3000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {

private static final Properties KAFKA_PROPERTIES = new Properties();
static {
KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
KAFKA_PROPERTIES.setProperty("group.id", "dummy");
KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
}
Expand Down Expand Up @@ -224,7 +223,6 @@ public void testTableSourceWithLegacyProperties() {

// use legacy properties
legacyPropertiesMap.remove("connector.specific-offsets");
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
legacyPropertiesMap.remove("connector.properties.group.id");

Expand All @@ -236,12 +234,10 @@ public void testTableSourceWithLegacyProperties() {
legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100");
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123");
legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect");
legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers");
legacyPropertiesMap.put("connector.properties.0.value", "dummy");
legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers");
legacyPropertiesMap.put("connector.properties.1.key", "group.id");
legacyPropertiesMap.put("connector.properties.1.value", "dummy");
legacyPropertiesMap.put("connector.properties.2.key", "group.id");
legacyPropertiesMap.put("connector.properties.2.value", "dummy");

final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
.createStreamTableSource(legacyPropertiesMap);
Expand Down Expand Up @@ -330,7 +326,6 @@ public void testTableSinkWithLegacyProperties() {

// use legacy properties
legacyPropertiesMap.remove("connector.specific-offsets");
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
legacyPropertiesMap.remove("connector.properties.group.id");

Expand All @@ -342,12 +337,10 @@ public void testTableSinkWithLegacyProperties() {
legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100");
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123");
legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect");
legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers");
legacyPropertiesMap.put("connector.properties.0.value", "dummy");
legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers");
legacyPropertiesMap.put("connector.properties.1.key", "group.id");
legacyPropertiesMap.put("connector.properties.1.value", "dummy");
legacyPropertiesMap.put("connector.properties.2.key", "group.id");
legacyPropertiesMap.put("connector.properties.2.value", "dummy");

final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public void testKafkaSourceSink() throws Exception {

// ---------- Produce an event time stream into Kafka -------------------
String groupId = standardProps.getProperty("group.id");
String zk = standardProps.getProperty("zookeeper.connect");
String bootstraps = standardProps.getProperty("bootstrap.servers");

// TODO: use DDL to register Kafka once FLINK-15282 is fixed.
Expand All @@ -83,7 +82,6 @@ public void testKafkaSourceSink() throws Exception {
properties.put("connector.type", "kafka");
properties.put("connector.topic", topic);
properties.put("connector.version", kafkaVersion());
properties.put("connector.properties.zookeeper.connect", zk);
properties.put("connector.properties.bootstrap.servers", bootstraps);
properties.put("connector.properties.group.id", groupId);
properties.put("connector.startup-mode", "earliest-offset");
Expand Down Expand Up @@ -112,7 +110,6 @@ public void testKafkaSourceSink() throws Exception {
// " 'connector.type' = 'kafka',\n" +
// " 'connector.topic' = '" + topic + "',\n" +
// " 'connector.version' = 'universal',\n" +
// " 'connector.properties.zookeeper.connect' = '" + zk + "',\n" +
// " 'connector.properties.bootstrap.servers' = '" + bootstraps + "',\n" +
// " 'connector.properties.group.id' = '" + groupId + "', \n" +
// " 'connector.startup-mode' = 'earliest-offset', \n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void prepare(Config config) throws Exception {
LOG.info("ZK and KafkaServer started.");

standardProps = new Properties();
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("enable.auto.commit", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* A simple example that shows how to read from and write to Kafka with Confluent Schema Registry.
* This will read AVRO messages from the input topic, parse them into a POJO type via checking the Schema by calling Schema registry.
* Then this example publish the POJO type to kafka by converting the POJO to AVRO and verifying the schema.
* --input-topic test-input --output-string-topic test-output --output-avro-topic test-avro-output --output-subject --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url http:https://localhost:8081 --group.id myconsumer
* --input-topic test-input --output-string-topic test-output --output-avro-topic test-avro-output --output-subject --bootstrap.servers localhost:9092 --schema-registry-url http:https://localhost:8081 --group.id myconsumer
*/
public class TestAvroConsumerConfluent {

Expand All @@ -49,14 +49,12 @@ public static void main(String[] args) throws Exception {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic> --output-string-topic <topic> --output-avro-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"--zookeeper.connect <zk quorum> " +
"--schema-registry-url <confluent schema registry> --group.id <some id>");
return;
}
Properties config = new Properties();
config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
config.setProperty("group.id", parameterTool.getRequired("group.id"));
config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect"));
String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void testKafka() throws Exception {
.addArgument("--output-topic", outputTopic)
.addArgument("--prefix", "PREFIX")
.addArgument("--bootstrap.servers", kafka.getBootstrapServerAddresses().stream().map(address -> address.getHostString() + ':' + address.getPort()).collect(Collectors.joining(",")))
.addArgument("--zookeeper.connect ", kafka.getZookeeperAddress().getHostString() + ':' + kafka.getZookeeperAddress().getPort())
.addArgument("--group.id", "myconsumer")
.addArgument("--auto.offset.reset", "earliest")
.addArgument("--transaction.timeout.ms", "900000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ tables:
topic: $TOPIC_NAME
startup-mode: earliest-offset
properties:
zookeeper.connect: $KAFKA_ZOOKEEPER_ADDRESS
bootstrap.servers: $KAFKA_BOOTSTRAP_SERVERS
format:
type: json
Expand Down Expand Up @@ -86,8 +85,6 @@ tables:
topic: test-avro
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: $KAFKA_ZOOKEEPER_ADDRESS
- key: bootstrap.servers
value: $KAFKA_BOOTSTRAP_SERVERS
format:
Expand Down
Loading

0 comments on commit f93346c

Please sign in to comment.