Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make 'schema-name' to be a required option #27

Merged
merged 3 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,16 @@ Once executed, the records should have been written to OceanBase.

| Option | Required | Default | Type | Description |
|----------------------------|----------|---------|----------|-----------------------------------------------------------------------------------------------------------------------|
| url | Yes | | String | JDBC url, schema name or database name is also required here |
| url | Yes | | String | JDBC url |
| schema-name | Yes | | String | Schema name or database name |
| table-name | Yes | | String | Table name |
| username | Yes | | String | User name |
| password | Yes | | String | Password |
| compatible-mode | Yes | | String | The compatible mode of OceanBase, can be 'mysql' or 'oracle' |
| driver-class | Yes | | String | JDBC driver class name, like 'com.mysql.jdbc.Driver' |
| connection-pool | Yes | | String | Database connection pool type, can be 'druid' or 'hikari' |
| cluster-name | No | | String | The cluster name of OceanBase |
| tenant-name | No | | String | The tenant name of OceanBase |
| schema-name | No | | String | Schema name or database name |
| cluster-name | No | | String | The cluster name of OceanBase, required when partition calculation is enabled |
| tenant-name | No | | String | The tenant name of OceanBase, required when partition calculation is enabled |
| connection-pool-properties | No | | String | Database connection pool properties, need to correspond to pool type, and multiple values are separated by semicolons |
| upsert-mode | No | true | Boolean | Whether to use upsert mode |
| buffer-flush.interval | No | 1s | Duration | Buffer flush interval |
Expand Down
8 changes: 4 additions & 4 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,16 @@ VALUES (1, 'Tom', 99),

| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|----------------------------|------|-------|----------|--------------------------------------|
| url | 是 | | String | 数据库的 JDBC url,需要包含 Schema 名或库名 |
| url | 是 | | String | 数据库的 JDBC url |
| schema-name | 是 | | String | 连接的 Schema 名或库名 |
| table-name | 是 | | String | 表名 |
| username | 是 | | String | 连接用户名 |
| password | 是 | | String | 连接密码 |
| compatible-mode | 是 | | String | 兼容模式,可以是 'mysql' 或 'oracle' |
| driver-class | 是 | | String | JDBC 驱动的类名,如 'com.mysql.jdbc.Driver' |
| connection-pool | 是 | | String | 连接池类型,可以是 'druid' 或 'hikari' |
| cluster-name | 否 | | String | 集群名 |
| tenant-name | 否 | | String | 租户名 |
| schema-name | 否 | | String | 连接的 Schema 名或库名 |
| cluster-name | 否 | | String | 集群名,开启分区计算功能时为必填 |
| tenant-name | 否 | | String | 租户名,开启分区计算功能时为必填 |
| connection-pool-properties | 否 | | String | 连接池属性,需要根据连接池类型进行配置,多个值用分号分隔 |
| upsert-mode | 否 | true | Boolean | 是否使用 upsert 模式 |
| buffer-flush.interval | 否 | 1s | Duration | 缓冲区刷新周期 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,9 @@ public class OceanBaseConnectorOptions implements Serializable {
public OceanBaseConnectorOptions(Map<String, String> allOptions) {
this.allConfig = Configuration.fromMap(allOptions);
if (allConfig.get(PARTITION_ENABLED)
&& (allConfig.get(CLUSTER_NAME) == null
|| allConfig.get(TENANT_NAME) == null
|| allConfig.get(SCHEMA_NAME) == null)) {
&& (allConfig.get(CLUSTER_NAME) == null || allConfig.get(TENANT_NAME) == null)) {
throw new IllegalArgumentException(
"'cluster-name', 'tenant-name' and 'schema-name' are required when 'partition.enabled' is true.");
"'cluster-name' and 'tenant-name' are required when 'partition.enabled' is true.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public String factoryIdentifier() {
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(OceanBaseConnectorOptions.URL);
options.add(OceanBaseConnectorOptions.SCHEMA_NAME);
options.add(OceanBaseConnectorOptions.TABLE_NAME);
options.add(OceanBaseConnectorOptions.USERNAME);
options.add(OceanBaseConnectorOptions.PASSWORD);
Expand All @@ -69,7 +70,6 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(OceanBaseConnectorOptions.CLUSTER_NAME);
options.add(OceanBaseConnectorOptions.TENANT_NAME);
options.add(OceanBaseConnectorOptions.SCHEMA_NAME);
options.add(OceanBaseConnectorOptions.COMPATIBLE_MODE);
options.add(OceanBaseConnectorOptions.CONNECTION_POOL_PROPERTIES);
options.add(OceanBaseConnectorOptions.UPSERT_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,71 @@ public interface OceanBaseDialect {
*/
String quoteIdentifier(@Nonnull String identifier);

/**
* Get the full table name
*
* @param schemaName schema name
* @param tableName table name
* @return full table name
*/
default String getFullTableName(@Nonnull String schemaName, @Nonnull String tableName) {
return String.format("%s.%s", quoteIdentifier(schemaName), quoteIdentifier(tableName));
}

/**
* Gets the upsert statement
*
* @param schemaName schema name
* @param tableName table name
* @param fieldNames field names list
* @param uniqueKeyFields unique key field names list
* @return the statement string
*/
String getUpsertStatement(
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> fieldNames,
@Nonnull List<String> uniqueKeyFields);

/**
* Gets the exist statement
*
* @param schemaName schema name
* @param tableName table name
* @param uniqueKeyFields unique key field names list
* @return the statement string
*/
default String getExistStatement(
@Nonnull String tableName, @Nonnull List<String> uniqueKeyFields) {
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> uniqueKeyFields) {
String conditionClause =
uniqueKeyFields.stream()
.map(f -> String.format("%s = ?", quoteIdentifier(f)))
.collect(Collectors.joining(" AND "));
return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
return "SELECT 1 FROM "
+ getFullTableName(schemaName, tableName)
+ " WHERE "
+ conditionClause;
}

/**
* Gets the insert statement
*
* @param schemaName schema name
* @param tableName table name
* @param fieldNames field names list
* @return the statement string
*/
default String getInsertIntoStatement(
@Nonnull String tableName, @Nonnull List<String> fieldNames) {
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> fieldNames) {
String columns =
fieldNames.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
String placeholders = String.join(", ", Collections.nCopies(fieldNames.size(), "?"));
return "INSERT INTO "
+ quoteIdentifier(tableName)
+ getFullTableName(schemaName, tableName)
+ "("
+ columns
+ ")"
Expand All @@ -82,12 +104,14 @@ default String getInsertIntoStatement(
/**
* Gets the update statement
*
* @param schemaName schema name
* @param tableName table name
* @param fieldNames field names list
* @param uniqueKeyFields unique key field names list
* @return the statement string
*/
default String getUpdateStatement(
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> fieldNames,
@Nonnull List<String> uniqueKeyFields) {
Expand All @@ -101,7 +125,7 @@ default String getUpdateStatement(
.map(f -> String.format("%s = ?", quoteIdentifier(f)))
.collect(Collectors.joining(" AND "));
return "UPDATE "
+ quoteIdentifier(tableName)
+ getFullTableName(schemaName, tableName)
+ " SET "
+ setClause
+ " WHERE "
Expand All @@ -111,17 +135,23 @@ default String getUpdateStatement(
/**
* Gets the delete statement
*
* @param schemaName schema name
* @param tableName table name
* @param uniqueKeyFields unique key field names list
* @return the statement string
*/
default String getDeleteStatement(
@Nonnull String tableName, @Nonnull List<String> uniqueKeyFields) {
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> uniqueKeyFields) {
String conditionClause =
uniqueKeyFields.stream()
.map(f -> String.format("%s = ?", quoteIdentifier(f)))
.collect(Collectors.joining(" AND "));
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
return "DELETE FROM "
+ getFullTableName(schemaName, tableName)
+ " WHERE "
+ conditionClause;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public String quoteIdentifier(@Nonnull String identifier) {

@Override
public String getUpsertStatement(
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> fieldNames,
@Nonnull List<String> uniqueKeyFields) {
Expand All @@ -34,7 +35,7 @@ public String getUpsertStatement(
.filter(f -> !uniqueKeyFields.contains(f))
.map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return getInsertIntoStatement(tableName, fieldNames)
return getInsertIntoStatement(schemaName, tableName, fieldNames)
+ " ON DUPLICATE KEY UPDATE "
+ updateClause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public String quoteIdentifier(@Nonnull String identifier) {

@Override
public String getUpsertStatement(
@Nonnull String schemaName,
@Nonnull String tableName,
@Nonnull List<String> fieldNames,
@Nonnull List<String> uniqueKeyFields) {
Expand Down Expand Up @@ -54,7 +55,7 @@ public String getUpsertStatement(
.collect(Collectors.joining(", "));

return "MERGE INTO "
+ tableName
+ getFullTableName(schemaName, tableName)
+ " t "
+ " USING (SELECT "
+ sourceFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,27 @@ public OceanBaseRowDataStatementExecutor(

OceanBaseConnectionInfo connectionInfo = connectionProvider.getConnectionInfo();
OceanBaseDialect dialect = connectionInfo.getDialect();
String schemaName = connectionInfo.getTableEntryKey().getSchemaName();
String tableName = connectionInfo.getTableEntryKey().getTableName();

this.existStatementSql =
dialect.getExistStatement(tableName, tableSchema.getKeyFieldNames());
dialect.getExistStatement(schemaName, tableName, tableSchema.getKeyFieldNames());
this.insertStatementSql =
dialect.getInsertIntoStatement(tableName, tableSchema.getFieldNames());
dialect.getInsertIntoStatement(schemaName, tableName, tableSchema.getFieldNames());
this.updateStatementSql =
dialect.getUpdateStatement(
tableName, tableSchema.getFieldNames(), tableSchema.getKeyFieldNames());
schemaName,
tableName,
tableSchema.getFieldNames(),
tableSchema.getKeyFieldNames());
this.deleteStatementSql =
dialect.getDeleteStatement(tableName, tableSchema.getKeyFieldNames());
dialect.getDeleteStatement(schemaName, tableName, tableSchema.getKeyFieldNames());
this.upsertStatementSql =
dialect.getUpsertStatement(
tableName, tableSchema.getFieldNames(), tableSchema.getKeyFieldNames());
schemaName,
tableName,
tableSchema.getFieldNames(),
tableSchema.getKeyFieldNames());
this.queryMemStoreSql =
connectionInfo.getVersion().isV4()
? dialect.getMemStoreExistStatement(options.getMemStoreThreshold())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -29,6 +31,7 @@

@Ignore
public class OceanBaseSinkTest {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSinkTest.class);

private static final OceanBaseDialect DIALECT = new OceanBaseMySQLDialect();

Expand Down Expand Up @@ -57,10 +60,7 @@ public void testSink() throws Exception {
+ "height double,"
+ "birthday date)";

String fullTableName =
String.format(
"%s.%s",
DIALECT.quoteIdentifier(schemaName), DIALECT.quoteIdentifier(tableName));
String fullTableName = DIALECT.getFullTableName(schemaName, tableName);

try (Connection connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
Expand Down Expand Up @@ -110,10 +110,18 @@ public void testSink() throws Exception {
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery(String.format("SELECT * FROM %s", fullTableName));
ResultSetMetaData metaData = rs.getMetaData();
int count = 0;
while (rs.next()) {
StringBuilder sb = new StringBuilder("Row ").append(count++).append(": { ");
for (int i = 0; i < metaData.getColumnCount(); i++) {
System.out.printf("%s: %s", metaData.getColumnName(i + 1), rs.getObject(i + 1));
if (i != 0) {
sb.append(", ");
}
sb.append(metaData.getColumnName(i + 1))
.append(": ")
.append(rs.getObject(i + 1));
}
LOG.info(sb.append("}").toString());
}
}
}
Expand Down
Loading