Skip to content

Commit

Permalink
feat: make memstore check optional (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Apr 23, 2023
1 parent 51dd7b6 commit c618300
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ Once executed, the records should have been written to OceanBase.
| buffer-flush.buffer-size | No | 1000 | Integer | Buffer size |
| buffer-flush.batch-size | No | 100 | Integer | Buffer flush batch size |
| max-retries | No | 3 | Integer | Max retry times on failure |
| memstore-check.enabled | No | true | Boolean | Whether enable memstore check |
| memstore-check.threshold | No | 0.9 | Double | Memstore usage threshold ratio relative to the limit value |
| memstore-check.interval | No | 30s | Duration | Memstore check interval |

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ VALUES (1, 'Tom', 99),
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小 |
| buffer-flush.batch-size || 100 | Integer | 刷新批量数据的批大小 |
| max-retries || 3 | Integer | 失败重试次数 |
| memstore-check.enabled || true | Boolean | 是否开启内存检查 |
| memstore-check.threshold || 0.9 | Double | 内存使用的阈值相对最大限制值的比例 |
| memstore-check.interval || 30s | Duration | 内存使用检查周期 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public OceanBaseRowDataStatementExecutor(
options.getTableName(),
tableSchema.getFieldNames(),
tableSchema.getKeyFieldNames());
try {
attemptQueryMemStore();
} catch (SQLException e) {
throw new RuntimeException("Failed to query memstore view", e);
if (options.isMemStoreCheckEnabled()) {
try {
attemptQueryMemStore();
} catch (SQLException e) {
throw new RuntimeException("Failed to query memstore view", e);
}
}
}

Expand Down Expand Up @@ -190,7 +192,9 @@ private String constructKey(RowData rowData, RowData.FieldGetter[] keyFieldGette

@Override
public void executeBatch() throws SQLException {
checkMemStore();
if (options.isMemStoreCheckEnabled()) {
checkMemStore();
}
if (!tableSchema.isHasKey()) {
synchronized (buffer) {
executeBatch(insertStatementSql, buffer, tableSchema.getFieldGetters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class OceanBaseWriterOptions implements Serializable {
private final int bufferSize;
private final int batchSize;
private final int maxRetries;
private final boolean memStoreCheckEnabled;
private final double memStoreThreshold;
private final long memStoreCheckInterval;

Expand All @@ -33,6 +34,7 @@ public OceanBaseWriterOptions(
int bufferSize,
int batchSize,
int maxRetries,
boolean memStoreCheckEnabled,
double memStoreThreshold,
long memStoreCheckInterval) {
this.tableName = tableName;
Expand All @@ -41,6 +43,7 @@ public OceanBaseWriterOptions(
this.bufferSize = bufferSize;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.memStoreCheckEnabled = memStoreCheckEnabled;
this.memStoreThreshold = memStoreThreshold;
this.memStoreCheckInterval = memStoreCheckInterval;
}
Expand Down Expand Up @@ -69,6 +72,10 @@ public int getMaxRetries() {
return maxRetries;
}

public boolean isMemStoreCheckEnabled() {
return memStoreCheckEnabled;
}

public double getMemStoreThreshold() {
return memStoreThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public class OceanBaseConnectorOptions implements Serializable {
.defaultValue(3)
.withDescription("The max retry times if writing records to database failed.");

public static final ConfigOption<Boolean> MEMSTORE_CHECK_ENABLED =
ConfigOptions.key("memstore-check.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether enable memstore check.");

public static final ConfigOption<Double> MEMSTORE_THRESHOLD =
ConfigOptions.key("memstore-check.threshold")
.doubleType()
Expand Down Expand Up @@ -139,6 +145,7 @@ public OceanBaseWriterOptions getWriterOptions() {
allConfig.get(BUFFER_SIZE),
allConfig.get(BUFFER_BATCH_SIZE),
allConfig.get(MAX_RETRIES),
allConfig.get(MEMSTORE_CHECK_ENABLED),
allConfig.get(MEMSTORE_THRESHOLD),
allConfig.get(MEMSTORE_CHECK_INTERVAL).toMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(OceanBaseConnectorOptions.BUFFER_SIZE);
options.add(OceanBaseConnectorOptions.BUFFER_BATCH_SIZE);
options.add(OceanBaseConnectorOptions.MAX_RETRIES);
options.add(OceanBaseConnectorOptions.MEMSTORE_CHECK_ENABLED);
options.add(OceanBaseConnectorOptions.MEMSTORE_THRESHOLD);
options.add(OceanBaseConnectorOptions.MEMSTORE_CHECK_INTERVAL);
return options;
Expand Down

0 comments on commit c618300

Please sign in to comment.