Skip to content

Commit

Permalink
[FLINK-8537] [table] Add a Kafka table source factory with Avro forma…
Browse files Browse the repository at this point in the history
…t support

This closes apache#5610.
  • Loading branch information
xccui authored and twalthr committed Apr 24, 2018
1 parent cdf4744 commit 614b1e2
Show file tree
Hide file tree
Showing 33 changed files with 1,288 additions and 433 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;

/**
* Factory for creating configured instances of {@link Kafka010AvroTableSource}.
*/
public class Kafka010AvroTableSourceFactory extends KafkaAvroTableSourceFactory {

@Override
protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
return new Kafka010AvroTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return CONNECTOR_VERSION_VALUE_010;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka010JsonTableSource.Builder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka010AvroTableSourceFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;

/**
* Tests for {@link Kafka010AvroTableSourceFactory}.
*/
public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {

@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_010;
}

@Override
protected KafkaAvroTableSource.Builder builder() {
return Kafka010AvroTableSource.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;

/**
* Factory for creating configured instances of {@link Kafka011AvroTableSource}.
*/
public class Kafka011AvroTableSourceFactory extends KafkaAvroTableSourceFactory {

@Override
protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
return new Kafka011AvroTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return CONNECTOR_VERSION_VALUE_011;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka011JsonTableSource.Builder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSourceFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;

/**
* Tests for {@link Kafka011AvroTableSourceFactory}.
*/
public class Kafka011AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {

@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_011;
}

@Override
protected KafkaAvroTableSource.Builder builder() {
return Kafka011AvroTableSource.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;

/**
* Factory for creating configured instances of {@link Kafka08AvroTableSource}.
*/
public class Kafka08AvroTableSourceFactory extends KafkaAvroTableSourceFactory {

@Override
protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
return new Kafka08AvroTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return CONNECTOR_VERSION_VALUE_08;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka08JsonTableSource.Builder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSourceFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;

/**
* Tests for {@link Kafka08AvroTableSourceFactory}.
*/
public class Kafka08AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {

@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_08;
}

@Override
protected KafkaAvroTableSource.Builder builder() {
return Kafka08AvroTableSource.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;

/**
* Factory for creating configured instances of {@link Kafka09AvroTableSource}.
*/
public class Kafka09AvroTableSourceFactory extends KafkaAvroTableSourceFactory {
@Override
protected KafkaAvroTableSource.Builder createKafkaAvroBuilder() {
return new Kafka09AvroTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return CONNECTOR_VERSION_VALUE_09;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
protected KafkaJsonTableSource.Builder createKafkaJsonBuilder() {
return new Kafka09JsonTableSource.Builder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory
org.apache.flink.streaming.connectors.kafka.Kafka09AvroTableSourceFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;

/**
* Tests for {@link Kafka09AvroTableSourceFactory}.
*/
public class Kafka09AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase {

@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_09;
}

@Override
protected KafkaAvroTableSource.Builder builder() {
return Kafka09AvroTableSource.builder();
}
}
Loading

0 comments on commit 614b1e2

Please sign in to comment.