Skip to content

Commit

Permalink
[FLINK-15933][catalog][doc] Update content of how generic table schem…
Browse files Browse the repository at this point in the history
…a is stored in hive via HiveCatalog

closes apache#11029
  • Loading branch information
lirui-apache authored and bowenli86 committed Feb 11, 2020
1 parent 5614f21 commit 3766daa
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 199 deletions.
155 changes: 151 additions & 4 deletions docs/dev/table/hive/hive_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ as those of an overall Flink-Hive integration.
Once configured properly, `HiveCatalog` should just work out of box. Users can create Flink meta-objects with DDL, and shoud
see them immediately afterwards.

`HiveCatalog` can be used to handle two kinds of tables: Hive-compatible tables and generic tables. Hive-compatible tables
are those stored in a Hive-compatible way, in terms of both metadata and data in the storage layer. Therefore, Hive-compatible tables
created via Flink can be queried from Hive side.

Generic tables, on the other hand, are specific to Flink. When creating generic tables with `HiveCatalog`, we're just using
HMS to persist the metadata. While these tables are visible to Hive, it's unlikely Hive is able to understand
the metadata. And therefore using such tables in Hive leads to undefined behavior.

Flink uses the property '*is_generic*' to tell whether a table is Hive-compatible or generic. When creating a table with
`HiveCatalog`, it's by default considered generic. If you'd like to create a Hive-compatible table, make sure to set
`is_generic` to false in your table properties.

As stated above, generic tables shouldn't be used from Hive. In Hive CLI, you can call `DESCRIBE FORMATTED` for a table and
decide whether it's generic or not by checking the `is_generic` property. Generic tables will have `is_generic=true`.

### Example

We will walk through a simple example here.
Expand Down Expand Up @@ -190,18 +205,55 @@ root

{% endhighlight %}

Verify the table is also visible to Hive via Hive Cli:
Verify the table is also visible to Hive via Hive Cli, and note that the table has property `is_generic=true`:

{% highlight bash %}
hive> show tables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1 row(s)

{% endhighlight %}
hive> describe formatted mykafka;
OK
# col_name data_type comment


# Detailed Table Information
Database: default
Owner: null
CreateTime: ......
LastAccessTime: UNKNOWN
Retention: 0
Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
flink.format.type csv
flink.generic.table.schema.0.data-type VARCHAR(2147483647)
flink.generic.table.schema.0.name name
flink.generic.table.schema.1.data-type INT
flink.generic.table.schema.1.name age
flink.update-mode append
is_generic true
transient_lastDdlTime ......

# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.158 seconds, Fetched: 36 row(s)

Please note since this is a generic table, Hive doesn't understand such a table and using this table in
Hive leads to undefined behavior.
{% endhighlight %}


#### step 5: run Flink SQL to query the Kakfa table
Expand Down Expand Up @@ -243,3 +295,98 @@ You should see results produced by Flink in SQL Client now, as:
kaiky 18

{% endhighlight %}

## Supported Types

`HiveCatalog` supports all Flink types for generic tables.

For Hive-compatible tables, `HiveCatalog` needs to map Flink data types to corresponding Hive types as described in
the following table:

<table class="table table-bordered">
<thead>
<tr>
<th class="text-center" style="width: 25%">Flink Data Type</th>
<th class="text-center">Hive Data Type</th>
</tr>
</thead>
<tbody>
<tr>
<td class="text-center">CHAR(p)</td>
<td class="text-center">CHAR(p)</td>
</tr>
<tr>
<td class="text-center">VARCHAR(p)</td>
<td class="text-center">VARCHAR(p)</td>
</tr>
<tr>
<td class="text-center">STRING</td>
<td class="text-center">STRING</td>
</tr>
<tr>
<td class="text-center">BOOLEAN</td>
<td class="text-center">BOOLEAN</td>
</tr>
<tr>
<td class="text-center">TINYINT</td>
<td class="text-center">TINYINT</td>
</tr>
<tr>
<td class="text-center">SMALLINT</td>
<td class="text-center">SMALLINT</td>
</tr>
<tr>
<td class="text-center">INT</td>
<td class="text-center">INT</td>
</tr>
<tr>
<td class="text-center">BIGINT</td>
<td class="text-center">LONG</td>
</tr>
<tr>
<td class="text-center">FLOAT</td>
<td class="text-center">FLOAT</td>
</tr>
<tr>
<td class="text-center">DOUBLE</td>
<td class="text-center">DOUBLE</td>
</tr>
<tr>
<td class="text-center">DECIMAL(p, s)</td>
<td class="text-center">DECIMAL(p, s)</td>
</tr>
<tr>
<td class="text-center">DATE</td>
<td class="text-center">DATE</td>
</tr>
<tr>
<td class="text-center">TIMESTAMP(9)</td>
<td class="text-center">TIMESTAMP</td>
</tr>
<tr>
<td class="text-center">BYTES</td>
<td class="text-center">BINARY</td>
</tr>
<tr>
<td class="text-center">ARRAY&lt;T&gt;</td>
<td class="text-center">LIST&lt;T&gt;</td>
</tr>
<tr>
<td class="text-center">MAP<K, V></td>
<td class="text-center">MAP<K, V></td>
</tr>
<tr>
<td class="text-center">ROW</td>
<td class="text-center">STRUCT</td>
</tr>
</tbody>
</table>

Something to note about the type mapping:
* Hive's `CHAR(p)` has a maximum length of 255
* Hive's `VARCHAR(p)` has a maximum length of 65535
* Hive's `MAP` only supports primitive key types while Flink's `MAP` can be any data type
* Hive's `UNION` type is not supported
* Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9.
* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
155 changes: 151 additions & 4 deletions docs/dev/table/hive/hive_catalog.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ as those of an overall Flink-Hive integration.
Once configured properly, `HiveCatalog` should just work out of box. Users can create Flink meta-objects with DDL, and shoud
see them immediately afterwards.

`HiveCatalog` can be used to handle two kinds of tables: Hive-compatible tables and generic tables. Hive-compatible tables
are those stored in a Hive-compatible way, in terms of both metadata and data in the storage layer. Therefore, Hive-compatible tables
created via Flink can be queried from Hive side.

Generic tables, on the other hand, are specific to Flink. When creating generic tables with `HiveCatalog`, we're just using
HMS to persist the metadata. While these tables are visible to Hive, it's unlikely Hive is able to understand
the metadata. And therefore using such tables in Hive leads to undefined behavior.

Flink uses the property '*is_generic*' to tell whether a table is Hive-compatible or generic. When creating a table with
`HiveCatalog`, it's by default considered generic. If you'd like to create a Hive-compatible table, make sure to set
`is_generic` to false in your table properties.

As stated above, generic tables shouldn't be used from Hive. In Hive CLI, you can call `DESCRIBE FORMATTED` for a table and
decide whether it's generic or not by checking the `is_generic` property. Generic tables will have `is_generic=true`.

### Example

We will walk through a simple example here.
Expand Down Expand Up @@ -190,18 +205,55 @@ root

{% endhighlight %}

Verify the table is also visible to Hive via Hive Cli:
Verify the table is also visible to Hive via Hive Cli, and note that the table has property `is_generic=true`:

{% highlight bash %}
hive> show tables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1 row(s)

{% endhighlight %}
hive> describe formatted mykafka;
OK
# col_name data_type comment


# Detailed Table Information
Database: default
Owner: null
CreateTime: ......
LastAccessTime: UNKNOWN
Retention: 0
Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.properties.zookeeper.connect localhost:2181
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
flink.format.type csv
flink.generic.table.schema.0.data-type VARCHAR(2147483647)
flink.generic.table.schema.0.name name
flink.generic.table.schema.1.data-type INT
flink.generic.table.schema.1.name age
flink.update-mode append
is_generic true
transient_lastDdlTime ......

# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.158 seconds, Fetched: 36 row(s)

Please note since this is a generic table, Hive doesn't understand such a table and using this table in
Hive leads to undefined behavior.
{% endhighlight %}


#### step 5: run Flink SQL to query the Kakfa table
Expand Down Expand Up @@ -243,3 +295,98 @@ You should see results produced by Flink in SQL Client now, as:
kaiky 18

{% endhighlight %}

## Supported Types

`HiveCatalog` supports all Flink types for generic tables.

For Hive-compatible tables, `HiveCatalog` needs to map Flink data types to corresponding Hive types as described in
the following table:

<table class="table table-bordered">
<thead>
<tr>
<th class="text-center" style="width: 25%">Flink Data Type</th>
<th class="text-center">Hive Data Type</th>
</tr>
</thead>
<tbody>
<tr>
<td class="text-center">CHAR(p)</td>
<td class="text-center">CHAR(p)</td>
</tr>
<tr>
<td class="text-center">VARCHAR(p)</td>
<td class="text-center">VARCHAR(p)</td>
</tr>
<tr>
<td class="text-center">STRING</td>
<td class="text-center">STRING</td>
</tr>
<tr>
<td class="text-center">BOOLEAN</td>
<td class="text-center">BOOLEAN</td>
</tr>
<tr>
<td class="text-center">TINYINT</td>
<td class="text-center">TINYINT</td>
</tr>
<tr>
<td class="text-center">SMALLINT</td>
<td class="text-center">SMALLINT</td>
</tr>
<tr>
<td class="text-center">INT</td>
<td class="text-center">INT</td>
</tr>
<tr>
<td class="text-center">BIGINT</td>
<td class="text-center">LONG</td>
</tr>
<tr>
<td class="text-center">FLOAT</td>
<td class="text-center">FLOAT</td>
</tr>
<tr>
<td class="text-center">DOUBLE</td>
<td class="text-center">DOUBLE</td>
</tr>
<tr>
<td class="text-center">DECIMAL(p, s)</td>
<td class="text-center">DECIMAL(p, s)</td>
</tr>
<tr>
<td class="text-center">DATE</td>
<td class="text-center">DATE</td>
</tr>
<tr>
<td class="text-center">TIMESTAMP(9)</td>
<td class="text-center">TIMESTAMP</td>
</tr>
<tr>
<td class="text-center">BYTES</td>
<td class="text-center">BINARY</td>
</tr>
<tr>
<td class="text-center">ARRAY&lt;T&gt;</td>
<td class="text-center">LIST&lt;T&gt;</td>
</tr>
<tr>
<td class="text-center">MAP<K, V></td>
<td class="text-center">MAP<K, V></td>
</tr>
<tr>
<td class="text-center">ROW</td>
<td class="text-center">STRUCT</td>
</tr>
</tbody>
</table>

Something to note about the type mapping:
* Hive's `CHAR(p)` has a maximum length of 255
* Hive's `VARCHAR(p)` has a maximum length of 65535
* Hive's `MAP` only supports primitive key types while Flink's `MAP` can be any data type
* Hive's `UNION` type is not supported
* Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9.
* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
Loading

0 comments on commit 3766daa

Please sign in to comment.