Skip to content

Commit

Permalink
feat: add memstore check (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Apr 20, 2023
1 parent f5af623 commit 51dd7b6
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ Once executed, the records should have been written to OceanBase.
| buffer-flush.buffer-size | No | 1000 | Integer | Buffer size |
| buffer-flush.batch-size | No | 100 | Integer | Buffer flush batch size |
| max-retries | No | 3 | Integer | Max retry times on failure |
| memstore-check.threshold | No | 0.9 | Double | Memstore usage threshold ratio relative to the limit value |
| memstore-check.interval | No | 30s | Duration | Memstore check interval |

## Community

Expand Down
2 changes: 2 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ VALUES (1, 'Tom', 99),
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小 |
| buffer-flush.batch-size || 100 | Integer | 刷新批量数据的批大小 |
| max-retries || 3 | Integer | 失败重试次数 |
| memstore-check.threshold || 0.9 | Double | 内存使用的阈值相对最大限制值的比例 |
| memstore-check.interval || 30s | Duration | 内存使用检查周期 |

## 社区

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.alibaba.druid.pool.DruidDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang3.StringUtils;

import javax.sql.DataSource;

Expand All @@ -28,6 +29,7 @@ public class OceanBaseConnectionPool implements OceanBaseConnectionProvider, Ser
private final OceanBaseConnectionOptions options;
private DataSource dataSource;
private volatile boolean inited = false;
private String compatibleMode;

public OceanBaseConnectionPool(OceanBaseConnectionOptions options) {
this.options = options;
Expand Down Expand Up @@ -78,6 +80,14 @@ public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public String getCompatibleMode() throws SQLException {
if (StringUtils.isBlank(compatibleMode)) {
compatibleMode = OceanBaseConnectionProvider.super.getCompatibleMode();
}
return compatibleMode;
}

@Override
public void close() throws Exception {
if (dataSource != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import com.oceanbase.connector.flink.table.OceanBaseTableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class OceanBaseRowDataStatementExecutor implements OceanBaseStatementExecutor<RowData> {
private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseRowDataStatementExecutor.class);

private static final long serialVersionUID = 1L;

Expand All @@ -49,6 +54,8 @@ public class OceanBaseRowDataStatementExecutor implements OceanBaseStatementExec
private final Map<String, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();

private transient volatile boolean closed = false;
private String queryMemStoreSql;
private volatile long lastCheckMemStoreTime;

public OceanBaseRowDataStatementExecutor(
Sink.InitContext context,
Expand Down Expand Up @@ -76,6 +83,45 @@ public OceanBaseRowDataStatementExecutor(
options.getTableName(),
tableSchema.getFieldNames(),
tableSchema.getKeyFieldNames());
try {
attemptQueryMemStore();
} catch (SQLException e) {
throw new RuntimeException("Failed to query memstore view", e);
}
}

private void attemptQueryMemStore() throws SQLException {
String compatibleMode = connectionProvider.getCompatibleMode();
String view, legacyView;
if ("mysql".equalsIgnoreCase(compatibleMode)) {
view = "oceanbase.GV$OB_MEMSTORE";
legacyView = "oceanbase.gv$memstore";
} else {
view = "SYS.GV$OB_MEMSTORE";
legacyView = "SYS.GV$MEMSTORE";
}
queryMemStoreSql =
String.format(
"SELECT 1 FROM %s WHERE MEMSTORE_USED > MEMSTORE_LIMIT * %f",
view, options.getMemStoreThreshold());
try {
hasMemStoreReachedThreshold();
} catch (SQLException e) {
LOG.warn("Failed to query memstore view {}, try {} instead", view, legacyView, e);
queryMemStoreSql =
String.format(
"SELECT 1 FROM %s WHERE TOTAL > MEM_LIMIT * %f",
legacyView, options.getMemStoreThreshold());
hasMemStoreReachedThreshold();
}
}

private boolean hasMemStoreReachedThreshold() throws SQLException {
try (Connection connection = connectionProvider.getConnection();
Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(queryMemStoreSql);
return resultSet.next();
}
}

@Override
Expand Down Expand Up @@ -144,6 +190,7 @@ private String constructKey(RowData rowData, RowData.FieldGetter[] keyFieldGette

@Override
public void executeBatch() throws SQLException {
checkMemStore();
if (!tableSchema.isHasKey()) {
synchronized (buffer) {
executeBatch(insertStatementSql, buffer, tableSchema.getFieldGetters());
Expand Down Expand Up @@ -188,6 +235,25 @@ public void executeBatch() throws SQLException {
}
}

private void checkMemStore() throws SQLException {
long now = System.currentTimeMillis();
if (closed || now - lastCheckMemStoreTime < options.getMemStoreCheckInterval()) {
return;
}
while (!closed && hasMemStoreReachedThreshold()) {
LOG.warn(
"Memstore reaches threshold {}, thread will sleep {} milliseconds",
options.getMemStoreThreshold(),
options.getMemStoreCheckInterval());
try {
Thread.sleep(options.getMemStoreCheckInterval());
} catch (InterruptedException e) {
LOG.warn(e.getMessage());
}
}
lastCheckMemStoreTime = System.currentTimeMillis();
}

/**
* Checks whether the row data exists
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,26 @@ public class OceanBaseWriterOptions implements Serializable {
private final int bufferSize;
private final int batchSize;
private final int maxRetries;
private final double memStoreThreshold;
private final long memStoreCheckInterval;

public OceanBaseWriterOptions(
String tableName,
boolean upsertMode,
long batchIntervalMs,
int bufferSize,
int batchSize,
int maxRetries) {
int maxRetries,
double memStoreThreshold,
long memStoreCheckInterval) {
this.tableName = tableName;
this.upsertMode = upsertMode;
this.batchIntervalMs = batchIntervalMs;
this.bufferSize = bufferSize;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.memStoreThreshold = memStoreThreshold;
this.memStoreCheckInterval = memStoreCheckInterval;
}

public String getTableName() {
Expand All @@ -62,4 +68,12 @@ public int getBatchSize() {
public int getMaxRetries() {
return maxRetries;
}

public double getMemStoreThreshold() {
return memStoreThreshold;
}

public long getMemStoreCheckInterval() {
return memStoreCheckInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public class OceanBaseConnectorOptions implements Serializable {
.intType()
.defaultValue(3)
.withDescription("The max retry times if writing records to database failed.");

public static final ConfigOption<Double> MEMSTORE_THRESHOLD =
ConfigOptions.key("memstore-check.threshold")
.doubleType()
.defaultValue(0.9)
.withDescription("Memory usage threshold ratio relative to the limit value.");

public static final ConfigOption<Duration> MEMSTORE_CHECK_INTERVAL =
ConfigOptions.key("memstore-check.interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The check interval mills, over this time, the writer will check if memstore reaches threshold.");

private final ReadableConfig allConfig;

public OceanBaseConnectorOptions(Map<String, String> allOptions) {
Expand All @@ -124,7 +138,9 @@ public OceanBaseWriterOptions getWriterOptions() {
allConfig.get(BUFFER_FLUSH_INTERVAL).toMillis(),
allConfig.get(BUFFER_SIZE),
allConfig.get(BUFFER_BATCH_SIZE),
allConfig.get(MAX_RETRIES));
allConfig.get(MAX_RETRIES),
allConfig.get(MEMSTORE_THRESHOLD),
allConfig.get(MEMSTORE_CHECK_INTERVAL).toMillis());
}

private Properties parseProperties(String propsStr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(OceanBaseConnectorOptions.BUFFER_SIZE);
options.add(OceanBaseConnectorOptions.BUFFER_BATCH_SIZE);
options.add(OceanBaseConnectorOptions.MAX_RETRIES);
options.add(OceanBaseConnectorOptions.MEMSTORE_THRESHOLD);
options.add(OceanBaseConnectorOptions.MEMSTORE_CHECK_INTERVAL);
return options;
}
}

0 comments on commit 51dd7b6

Please sign in to comment.