Skip to content

Commit

Permalink
feat: support multiple tables sync and ddl sync (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Jan 23, 2024
1 parent 775bc52 commit 5ef7ddd
Show file tree
Hide file tree
Showing 58 changed files with 2,437 additions and 2,109 deletions.
87 changes: 45 additions & 42 deletions docs/sink/flink-connector-obkv-hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ If you'd rather use the latest snapshots of the upcoming major version, use our

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
</dependency>

<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```

Expand All @@ -48,6 +48,7 @@ mvn clean package -DskipTests
### SQL JAR

To use this connector through Flink SQL directly, you need to download the shaded jar file named `flink-sql-connector-obkv-hbase-${project.version}.jar`:

- Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-obkv-hbase
- Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-obkv-hbase

Expand All @@ -59,11 +60,11 @@ Create a table with the name `htable1$family1`, which means the table name is `h
use test;
CREATE TABLE `htable1$family1`
(
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
)
```

Expand Down Expand Up @@ -93,7 +94,8 @@ public class Main {
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'url'='https://127.0.0.1:8080/services?...&database=test',"
+ " 'url'='https://127.0.0.1:8080/services?...',"
+ " 'schema-name'='test',"
+ " 'table-name'='htable1',"
+ " 'username'='root@test',"
+ " 'password'='',"
Expand Down Expand Up @@ -122,19 +124,19 @@ Put the JAR files of dependencies to the 'lib' directory of Flink, and then crea
```sql
CREATE TABLE t_sink
(
rowkey STRING,
family1 ROW <column1 STRING,
column2 STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
)
with (
'connector'='obkv-hbase',
'url'='https://127.0.0.1:8080/services?...&database=test',
'table-name'='htable1',
'username'='root@test',
'password'='',
'sys.username'='root',
'sys.password'='');
rowkey STRING,
family1 ROW <column1 STRING,
column2 STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) with (
'connector'='obkv-hbase',
'url'='https://127.0.0.1:8080/services?...',
'schema-name'='test',
'table-name'='htable1',
'username'='root@test',
'password'='',
'sys.username'='root',
'sys.password'='');
```

Insert records by Flink SQL.
Expand All @@ -150,18 +152,19 @@ Once executed, the records should have been written to OceanBase.

## Configuration

| Option | Required | Default | Type | Description |
|--------------------------|----------|---------|----------|-----------------------------------|
| url | Yes | | String | The config url with database name |
| table-name | Yes | | String | The table name of HBase |
| username | Yes | | String | The username |
| password | Yes | | String | The password |
| sys.username | Yes | | String | The username of sys tenant |
| sys.password | Yes | | String | The password of sys tenant |
| buffer-flush.interval | No | 1s | Duration | Buffer flush interval |
| buffer-flush.buffer-size | No | 1000 | Integer | Buffer size |
| buffer-flush.batch-size | No | 100 | Integer | Buffer flush batch size |
| max-retries | No | 3 | Integer | Max retry times on failure |
| Option | Required | Default | Type | Description |
|--------------------------|----------|---------|----------|--------------------------------------------------------------------------------------------------------------|
| url | Yes | | String | The config url, can be queried by <code>SHOW PARAMETERS LIKE 'obconfig_url'</code>. |
| schema-name | Yes | | String | The database name of OceanBase. |
| table-name | Yes | | String | The table name of HBase, note that the table name in OceanBase is <code>hbase_table_name$family_name</code>. |
| username | Yes | | String | The username of non-sys tenant. |
| password | Yes | | String | The password of non-sys tenant. |
| sys.username | Yes | | String | The username of sys tenant. |
| sys.password | Yes | | String | The password of sys tenant. |
| hbase.properties | No | | String | Properties to configure 'obkv-hbase-client-java', multiple values are separated by semicolons. |
| buffer-flush.interval | No | 1s | Duration | Buffer flush interval. |
| buffer-flush.buffer-size | No | 1000 | Integer | Buffer size. |
| max-retries | No | 3 | Integer | Max retry times on failure. |

## References

Expand Down
87 changes: 45 additions & 42 deletions docs/sink/flink-connector-obkv-hbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
</dependency>

<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```

Expand All @@ -48,6 +48,7 @@ mvn clean package -DskipTests
### SQL JAR

要直接通过 Flink SQL 使用此连接器,您需要下载名为`flink-sql-connector-obkv-hbase-${project.version}.jar`的包含所有依赖的 jar 文件:

- 正式版本:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-obkv-hbase
- 快照版本:https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-obkv-hbase

Expand All @@ -59,11 +60,11 @@ mvn clean package -DskipTests
use test;
CREATE TABLE `htable1$family1`
(
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
)
```

Expand Down Expand Up @@ -93,7 +94,8 @@ public class Main {
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'url'='https://127.0.0.1:8080/services?...&database=test',"
+ " 'url'='https://127.0.0.1:8080/services?...',"
+ " 'schema-name'='test',"
+ " 'table-name'='htable1',"
+ " 'username'='root@test',"
+ " 'password'='',"
Expand Down Expand Up @@ -122,19 +124,19 @@ public class Main {
```sql
CREATE TABLE t_sink
(
rowkey STRING,
family1 ROW <column1 STRING,
column2 STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
)
with (
'connector'='obkv-hbase',
'url'='https://127.0.0.1:8080/services?...&database=test',
'table-name'='htable1',
'username'='root@test',
'password'='',
'sys.username'='root',
'sys.password'='');
rowkey STRING,
family1 ROW <column1 STRING,
column2 STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) with (
'connector'='obkv-hbase',
'url'='https://127.0.0.1:8080/services?...',
'schema-name'='test',
'table-name'='htable1',
'username'='root@test',
'password'='',
'sys.username'='root',
'sys.password'='');
```

插入测试数据
Expand All @@ -150,18 +152,19 @@ VALUES ('1', ROW ('r1f1c1', 'r1f1c2')),

## 配置项

| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|--------------------------|------|------|----------|-----------------------------|
| url || | String | 集群的 config url,需要带 database |
| table-name || | String | HBase 表名 |
| username || | String | 用户名 |
| password || | String | 密码 |
| sys.username || | String | sys 租户的用户名 |
| sys.password || | String | sys 租户用户的密码 |
| buffer-flush.interval || 1s | Duration | 缓冲区刷新周期 |
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小 |
| buffer-flush.batch-size || 100 | Integer | 刷新批量数据的批大小 |
| max-retries || 3 | Integer | 失败重试次数 |
| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|--------------------------|------|------|----------|---------------------------------------------------------------------------|
| url || | String | 集群的 config url,可以通过 <code>SHOW PARAMETERS LIKE 'obconfig_url'</code> 查询。 |
| schema-name || | String | OceanBase 的 db 名。 |
| table-name || | String | HBase 表名,注意在 OceanBase 中表名的结构是 <code>hbase_table_name$family_name</code>。 |
| username || | String | 非 sys 租户的用户名。 |
| password || | String | 非 sys 租户的密码。 |
| sys.username || | String | sys 租户的用户名。 |
| sys.password || | String | sys 租户用户的密码。 |
| hbase.properties || | String | 配置 'obkv-hbase-client-java' 的属性,多个值用分号分隔。 |
| buffer-flush.interval || 1s | Duration | 缓冲区刷新周期。 |
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小。 |
| max-retries || 3 | Integer | 失败重试次数。 |

## 参考信息

Expand Down
Loading

0 comments on commit 5ef7ddd

Please sign in to comment.