Skip to content

Commit

Permalink
[FLINK-11044] [docs] Fix registerTableSink docs
Browse files Browse the repository at this point in the history
This closes apache#7208.
  • Loading branch information
kgorman authored and twalthr committed Dec 4, 2018
1 parent 591160a commit b0d1d33
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ The following timestamp extractors are supported:
.timestampsFromField("ts_field") // required: original field name in the input
)

// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
// and thus preserves the assigned timestamps from the source.
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
.rowtime(
Expand All @@ -337,7 +337,7 @@ rowtime:
type: from-field
from: "ts_field" # required: original field name in the input

# Converts the assigned timestamps from a DataStream API record into the rowtime attribute
# Converts the assigned timestamps from a DataStream API record into the rowtime attribute
# and thus preserves the assigned timestamps from the source.
rowtime:
timestamps:
Expand All @@ -351,7 +351,7 @@ The following watermark strategies are supported:
<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
Expand All @@ -377,7 +377,7 @@ The following watermark strategies are supported:

<div data-lang="YAML" markdown="1">
{% highlight yaml %}
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
# observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
# are not late.
rowtime:
Expand Down Expand Up @@ -695,7 +695,7 @@ connector:

**Key extraction:** Flink automatically extracts valid keys from a query. For example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and `b`. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined.

<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats).
<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats).

{% top %}

Expand All @@ -717,8 +717,8 @@ The CSV format allows to read and write comma-separated rows.
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix('#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
Expand All @@ -736,8 +736,8 @@ format:
type: VARCHAR
- name: field2
type: TIMESTAMP
field-delimiter: "," # optional: string delimiter "," by default
line-delimiter: "\n" # optional: string delimiter "\n" by default
field-delimiter: "," # optional: string delimiter "," by default
line-delimiter: "\n" # optional: string delimiter "\n" by default
quote-character: '"' # optional: single character for string values, empty by default
comment-prefix: '#' # optional: string to indicate comments, empty by default
ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
Expand Down Expand Up @@ -992,7 +992,7 @@ These are the additional `TableSink`s which are provided with Flink:
| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.
| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.

### OrcTableSource

Expand Down Expand Up @@ -1044,7 +1044,7 @@ val orcTableSource = OrcTableSource.builder()

### CsvTableSink

The `CsvTableSink` emits a `Table` to one or more CSV files.
The `CsvTableSink` emits a `Table` to one or more CSV files.

The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details. When emitting a streaming table, rows are written at least once (if checkpointing is enabled) and the `CsvTableSink` does not split output files into bucket files but continuously writes to the same files.

Expand All @@ -1053,17 +1053,17 @@ The sink only supports append-only streaming tables. It cannot be used to emit a
{% highlight java %}

CsvTableSink sink = new CsvTableSink(
path, // output path
path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
WriteMode.OVERWRITE); // optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
sink,
// specify table schema
new String[]{"f0", "f1"},
new TypeInformation[]{Types.STRING, Types.INT});
new TypeInformation[]{Types.STRING, Types.INT},
sink);

Table table = ...
table.insertInto("csvOutputTable");
Expand All @@ -1074,17 +1074,17 @@ table.insertInto("csvOutputTable");
{% highlight scala %}

val sink: CsvTableSink = new CsvTableSink(
path, // output path
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE) // optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
sink,
// specify table schema
Array[String]("f0", "f1"),
Array[TypeInformation[_]](Types.STRING, Types.INT))
Array[TypeInformation[_]](Types.STRING, Types.INT),
sink)

val table: Table = ???
table.insertInto("csvOutputTable")
Expand Down Expand Up @@ -1113,10 +1113,10 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()

tableEnv.registerTableSink(
"jdbcOutputTable",
sink,
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT});
new TypeInformation[]{Types.INT},
sink);

Table table = ...
table.insertInto("jdbcOutputTable");
Expand All @@ -1134,18 +1134,18 @@ val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()

tableEnv.registerTableSink(
"jdbcOutputTable",
sink,
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT))
Array[TypeInformation[_]](Types.INT),
sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")
{% endhighlight %}
</div>
</div>

Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.
Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.

{% top %}

Expand All @@ -1164,16 +1164,16 @@ To use the `CassandraAppendTableSink`, you have to add the Cassandra connector d
ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
builder,
builder,
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));

tableEnv.registerTableSink(
"cassandraOutputTable",
sink,
// specify table schema
new String[]{"id", "name", "value"},
new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});
new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
sink);

Table table = ...
table.insertInto(cassandraOutputTable);
Expand All @@ -1185,16 +1185,16 @@ table.insertInto(cassandraOutputTable);
val builder: ClusterBuilder = ... // configure Cassandra cluster connection

val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
builder,
builder,
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))

tableEnv.registerTableSink(
"cassandraOutputTable",
sink,
// specify table schema
Array[String]("id", "name", "value"),
Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))
Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
sink)

val table: Table = ???
table.insertInto(cassandraOutputTable)
Expand Down

0 comments on commit b0d1d33

Please sign in to comment.