Skip to content

Commit

Permalink
[FLINK-8118] [table] Fix documentation mistakes
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Nov 23, 2017
1 parent 4083c70 commit 7ff3f37
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
56 changes: 28 additions & 28 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a builder. The followin
<div data-lang="java" markdown="1">
{% highlight java %}
// create builder
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// set Kafka topic
.forTopic("sensors")
// set Kafka consumer properties
Expand All @@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder()
<div data-lang="scala" markdown="1">
{% highlight scala %}
// create builder
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// set Kafka topic
.forTopic("sensors")
// set Kafka consumer properties
Expand Down Expand Up @@ -108,7 +108,7 @@ Map<String, String> mapping = new HashMap<>();
mapping.put("sensorId", "id");
mapping.put("temperature", "temp");

TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
// set Table schema
.withSchema(TableSchema.builder()
Expand All @@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder()

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
// set Table schema
.withSchema(TableSchema.builder()
Expand All @@ -150,7 +150,7 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
// configure missing field behavior
.failOnMissingField(true)
Expand All @@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder()

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
// configure missing field behavior
.failOnMissingField(true)
Expand All @@ -174,20 +174,20 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
// start reading from the earliest offset
.startReadingFromEarliest()
.fromEarliest()
.build();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
// start reading from the earliest offset
.startReadingFromEarliest()
.fromEarliest()
.build()
{% endhighlight %}
</div>
Expand All @@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a builder. The followin
<div data-lang="java" markdown="1">
{% highlight java %}
// create builder
TableSource source = Kafka010AvroTableSource.builder()
KafkaTableSource source = Kafka010AvroTableSource.builder()
// set Kafka topic
.forTopic("sensors")
// set Kafka consumer properties
Expand All @@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder()
<div data-lang="scala" markdown="1">
{% highlight scala %}
// create builder
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// set Kafka topic
.forTopic("sensors")
// set Kafka consumer properties
Expand Down Expand Up @@ -256,32 +256,32 @@ Map<String, String> mapping = new HashMap<>();
mapping.put("sensorId", "id");
mapping.put("temperature", "temp");

TableSource source = Kafka010AvroTableSource.builder()
KafkaTableSource source = Kafka010AvroTableSource.builder()
// ...
// set Table schema
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
.field("temperature", Types.DOUBLE()).build())
// set class of Avro record with fields [id, temp]
.forAvroRecordClass(SensorReading.class)
// set mapping from table fields to JSON fields
.withTableToJsonMapping(mapping)
// set mapping from table fields to Avro fields
.withTableToAvroMapping(mapping)
.build();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010AvroTableSource.builder()
val source: KafkaTableSource = Kafka010AvroTableSource.builder()
// ...
// set Table schema
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG)
.field("temperature", Types.DOUBLE).build())
// set class of Avro record with fields [id, temp]
.forAvroRecordClass(classOf[SensorReading])
// set mapping from table fields to JSON fields
.withTableToJsonMapping(Map(
// set mapping from table fields to Avro fields
.withTableToAvroMapping(Map(
"sensorId" -> "id",
"temperature" -> "temp").asJava)
.build()
Expand All @@ -294,20 +294,20 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder()
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010AvroTableSource.builder()
// ...
// start reading from the earliest offset
.startReadingFromEarliest()
.fromEarliest()
.build();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010AvroTableSource.builder()
// ...
// start reading from the earliest offset
.startReadingFromEarliest()
.fromEarliest()
.build()
{% endhighlight %}
</div>
Expand All @@ -326,7 +326,7 @@ A table schema field of type `SQL_TIMESTAMP` can be declared as a processing tim
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
Expand All @@ -341,7 +341,7 @@ TableSource source = Kafka010JsonTableSource.builder()

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG)
Expand Down Expand Up @@ -372,7 +372,7 @@ The following example shows how to configure a rowtime attribute.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
Expand All @@ -392,7 +392,7 @@ TableSource source = Kafka010JsonTableSource.builder()

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG)
Expand All @@ -418,7 +418,7 @@ Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies whe
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TableSource source = Kafka010JsonTableSource.builder()
KafkaTableSource source = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG())
Expand All @@ -437,7 +437,7 @@ TableSource source = Kafka010JsonTableSource.builder()

<div data-lang="scala" markdown="1">
{% highlight scala %}
val source: TableSource[_] = Kafka010JsonTableSource.builder()
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
// ...
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testRowtimeAttribute() {
}

@Test
public void testKafkaTSRowtimeAttribute() {
public void testRowtimeAttribute2() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);

Expand Down Expand Up @@ -191,7 +191,8 @@ public void testKafkaTSRowtimeAttribute() {
}

@Test
public void testKafkaTSSetConsumeOffsets() {
@SuppressWarnings("unchecked")
public void testConsumerOffsets() {
KafkaTableSource.Builder b = getBuilder();
configureBuilder(b);

Expand Down

0 comments on commit 7ff3f37

Please sign in to comment.