Skip to content

Commit

Permalink
[FLINK-6563] [table] Add builders with time attribute support for Kaf…
Browse files Browse the repository at this point in the history
…kaTableSources.

This closes apache#4638.
  • Loading branch information
fhueske committed Oct 31, 2017
1 parent 505d478 commit 0e92b66
Show file tree
Hide file tree
Showing 43 changed files with 2,392 additions and 340 deletions.
400 changes: 344 additions & 56 deletions docs/dev/table/sourceSinks.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -37,22 +42,100 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka010AvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> record) {

super(
topic,
properties,
schema,
record);
}

/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
*/
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}

/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
*/
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}

/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
*/
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}

@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}

/**
* Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
* @return A builder to configure and create a {@link Kafka010AvroTableSource}.
*/
public static Builder builder() {
return new Builder();
}

/**
* A builder to configure and create a {@link Kafka010AvroTableSource}.
*/
public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {

@Override
protected boolean supportsKafkaTimestamps() {
return true;
}

@Override
protected Kafka010AvroTableSource.Builder builder() {
return this;
}

/**
* Builds and configures a {@link Kafka010AvroTableSource}.
*
* @return A configured {@link Kafka010AvroTableSource}.
*/
@Override
public Kafka010AvroTableSource build() {
Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getAvroRecordClass());
super.configureTableSource(tableSource);
return tableSource;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -33,21 +37,103 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.10 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
public Kafka010JsonTableSource(
String topic,
Properties properties,
TypeInformation<Row> typeInfo) {
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {

super(topic, properties, typeInfo);
super(topic, properties, tableSchema, jsonSchema);
}

/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
*/
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
}

/**
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
*/
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}

/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
*/
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}

/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
*/
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}

@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}

/**
* Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
* @return A builder to configure and create a {@link Kafka010JsonTableSource}.
*/
public static Kafka010JsonTableSource.Builder builder() {
return new Kafka010JsonTableSource.Builder();
}

/**
* A builder to configure and create a {@link Kafka010JsonTableSource}.
*/
public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {

@Override
protected boolean supportsKafkaTimestamps() {
return true;
}

@Override
protected Kafka010JsonTableSource.Builder builder() {
return this;
}

/**
* Builds and configures a {@link Kafka010JsonTableSource}.
*
* @return A configured {@link Kafka010JsonTableSource}.
*/
@Override
public Kafka010JsonTableSource build() {
Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getJsonSchema());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

Expand All @@ -28,7 +29,10 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
public class Kafka010TableSource extends Kafka09TableSource {
public abstract class Kafka010TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
private final DeserializationSchema<Row> deserializationSchema;

/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
Expand All @@ -43,9 +47,17 @@ public Kafka010TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TableSchema schema,
TypeInformation<Row> typeInfo) {

super(topic, properties, deserializationSchema, typeInfo);
super(topic, properties, schema, typeInfo);

this.deserializationSchema = deserializationSchema;
}

@Override
public DeserializationSchema<Row> getDeserializationSchema() {
return this.deserializationSchema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;

/**
* Tests for the {@link Kafka010AvroTableSource}.
*/
public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {

@Override
protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {

return new Kafka010AvroTableSource(
topic,
properties,
AvroSpecificRecord.class);
protected KafkaTableSource.Builder getBuilder() {
return Kafka010AvroTableSource.builder();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;

/**
* Tests for the {@link Kafka010JsonTableSource}.
*/
public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {

@Override
protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
return new Kafka010JsonTableSource(topic, properties, typeInfo);
protected KafkaTableSource.Builder getBuilder() {
return Kafka010JsonTableSource.builder();
}

@Override
Expand Down
Loading

0 comments on commit 0e92b66

Please sign in to comment.