Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add demo for OceanBase EE #63

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
docs: add demo for OceanBase EE
  • Loading branch information
whhe committed Apr 24, 2024
commit f31c8cac81730fed042f8a6bc3a6c96dbdf34f48
96 changes: 43 additions & 53 deletions docs/sink/flink-connector-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ To use this connector through Flink SQL directly, you need to download the shade
- Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase
- Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase

This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">Dependency Item</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><a href="https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client/2.4.9">com.oceanbase:oceanbase-client:2.4.9</a></td>
<td>Used for connecting to OceanBase EE.</td>
</tr>
</tbody>
</table>
</div>

### Demo

#### Preparation
Expand All @@ -68,59 +87,6 @@ CREATE TABLE `t_sink` (
);
```

#### Java Demo

Take Maven project for example, add the required dependencies to the pom.xml, and then use the following code.

```java
package com.oceanbase;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
"CREATE TABLE t_sink ( "
+ " id INT,"
+ " username VARCHAR,"
+ " score INT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:mysql:https://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100',"
+ " 'buffer-flush.interval' = '1s',"
+ " 'buffer-flush.buffer-size' = '5000',"
+ " 'max-retries' = '3'"
+ " );");

tEnv.executeSql(
"INSERT INTO t_sink VALUES "
+ "(1, 'Tom', 99),"
+ "(2, 'Jerry', 88),"
+ "(1, 'Tom', 89);")
.await();
}
}

```

Once executed, the records should have been written to OceanBase.

For more information please refer to [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java).

#### Flink SQL Demo

Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
Expand Down Expand Up @@ -156,6 +122,30 @@ VALUES (1, 'Tom', 99),

Once executed, the records should have been written to OceanBase.

For users of OceanBase EE, you need to specify the `url` and `driver-class-name` corresponding to the OceanBase JDBC driver.

```sql
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
```

## Configuration

| Option | Required by Table API | Required by DataStream | Default | Type | Description |
Expand Down
96 changes: 43 additions & 53 deletions docs/sink/flink-connector-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ mvn clean package -DskipTests
- 正式版本:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase
- 快照版本:https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase

本项目内置了 MySQL 驱动 8.0.28,对于想使用 OceanBase JDBC 驱动的 OceanBase 数据库企业版的用户,需要手动引入以下依赖:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">依赖名称</th>
<th class="text-left">说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><a href="https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client/2.4.9">com.oceanbase:oceanbase-client:2.4.9</a></td>
<td>用于连接到 OceanBase 数据库企业版。</td>
</tr>
</tbody>
</table>
</div>

### 示例

#### 准备
Expand All @@ -69,59 +88,6 @@ CREATE TABLE `t_sink`
);
```

#### Java 应用示例

以 Maven 项目为例,将需要的依赖加入到应用的 pom.xml 文件中,然后使用以下代码。

```java
package com.oceanbase;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
"CREATE TABLE t_sink ( "
+ " id INT,"
+ " username VARCHAR,"
+ " score INT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:mysql:https://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100',"
+ " 'buffer-flush.interval' = '1s',"
+ " 'buffer-flush.buffer-size' = '5000',"
+ " 'max-retries' = '3'"
+ " );");

tEnv.executeSql(
"INSERT INTO t_sink VALUES "
+ "(1, 'Tom', 99),"
+ "(2, 'Jerry', 88),"
+ "(1, 'Tom', 89);")
.await();
}
}

```

执行完成后,即可在 OceanBase 中检索验证。

更多信息请参考 [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java)。

#### Flink SQL 示例

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
Expand Down Expand Up @@ -158,6 +124,30 @@ VALUES (1, 'Tom', 99),

执行完成后,即可在 OceanBase 中检索验证。

对于 OceanBase 数据库企业版的用户,需要指定 OceanBase JDBC 驱动对应的 `url` 和 `driver-class-name`。

```sql
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
```

## 配置项

| 参数名 | Table API 必需 | DataStream 必需 | 默认值 | 类型 | 描述 |
Expand Down