Skip to content

Commit

Permalink
[FLINK-8538] [table] Improve unified table sources
Browse files Browse the repository at this point in the history
This closes apache#5564.
  • Loading branch information
twalthr committed Feb 27, 2018
1 parent 1d26062 commit db2c510
Show file tree
Hide file tree
Showing 71 changed files with 2,561 additions and 1,451 deletions.
8 changes: 8 additions & 0 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;

/**
* Factory for creating configured instances of {@link Kafka010JsonTableSource}.
*/
public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
return new Kafka010JsonTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return KAFKA_VERSION_VALUE_010;
return CONNECTOR_VERSION_VALUE_010;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.table.descriptors.Kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;

/**
* Tests for {@link Kafka010JsonTableSourceFactory}.
*/
public class Kafka010TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
protected String versionForTest() {
return KAFKA_VERSION_VALUE_010;
}
public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {

protected KafkaJsonTableSource.Builder builderForTest() {
return Kafka010JsonTableSource.builder();
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_010;
}

@Override
protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
// no extra settings
protected KafkaJsonTableSource.Builder builder() {
return Kafka010JsonTableSource.builder();
}
}
8 changes: 8 additions & 0 deletions flink-connectors/flink-connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;

/**
* Factory for creating configured instances of {@link Kafka011JsonTableSource}.
*/
public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
return new Kafka011JsonTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return KAFKA_VERSION_VALUE_011;
return CONNECTOR_VERSION_VALUE_011;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.table.descriptors.Kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;

/**
* Tests for {@link Kafka011JsonTableSourceFactory}.
*/
public class Kafka011TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
protected String versionForTest() {
return KAFKA_VERSION_VALUE_011;
}
public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {

protected KafkaJsonTableSource.Builder builderForTest() {
return Kafka011JsonTableSource.builder();
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_011;
}

@Override
protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
// no extra settings
protected KafkaJsonTableSource.Builder builder() {
return Kafka011JsonTableSource.builder();
}
}
8 changes: 8 additions & 0 deletions flink-connectors/flink-connector-kafka-0.8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;

/**
* Factory for creating configured instances of {@link Kafka08JsonTableSource}.
*/
public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
return new Kafka08JsonTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return KAFKA_VERSION_VALUE_08;
return CONNECTOR_VERSION_VALUE_08;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.table.descriptors.Kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;

/**
* Tests for {@link Kafka08JsonTableSourceFactory}.
*/
public class Kafka08TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
protected String versionForTest() {
return KAFKA_VERSION_VALUE_08;
}
public class Kafka08JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {

protected KafkaJsonTableSource.Builder builderForTest() {
return Kafka08JsonTableSource.builder();
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_08;
}

@Override
protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
builder.getKafkaProps().put("zookeeper.connect", "localhost:1111");
kafka.zookeeperConnect("localhost:1111");
protected KafkaJsonTableSource.Builder builder() {
return Kafka08JsonTableSource.builder();
}
}
8 changes: 8 additions & 0 deletions flink-connectors/flink-connector-kafka-0.9/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.streaming.connectors.kafka;

import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;

/**
* Factory for creating configured instances of {@link Kafka09JsonTableSource}.
*/
public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory {

@Override
protected KafkaJsonTableSource.Builder createBuilder() {
return new Kafka09JsonTableSource.Builder();
}

@Override
protected String kafkaVersion() {
return KAFKA_VERSION_VALUE_09;
return CONNECTOR_VERSION_VALUE_09;
}
}

This file was deleted.

Loading

0 comments on commit db2c510

Please sign in to comment.