Skip to content

Commit

Permalink
chore: modify classes for serialization (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Jan 29, 2024
1 parent 7554fb6 commit ca7de70
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public class OBKVHBaseConnectorOptions extends ConnectorOptions {

private static final long serialVersionUID = 1L;

public static final ConfigOption<String> SYS_USERNAME =
ConfigOptions.key("sys.username")
.stringType()
Expand All @@ -44,8 +46,6 @@ public class OBKVHBaseConnectorOptions extends ConnectorOptions {
.noDefaultValue()
.withDescription("Properties to configure 'obkv-hbase-client-java'.");

private static final long serialVersionUID = 1L;

public OBKVHBaseConnectorOptions(Map<String, String> config) {
super(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,27 @@ public class OBKVHBaseConnectionProvider implements ConnectionProvider {

private final OBKVHBaseConnectorOptions options;

private transient TableCache<HTableInterface> hTableCache;
private final TableCache<HTableInterface> tableCache;

public OBKVHBaseConnectionProvider(OBKVHBaseConnectorOptions options) {
this.options = options;
}

private TableCache<HTableInterface> getHTableCache() {
if (hTableCache == null) {
hTableCache = new TableCache<>();
}
return hTableCache;
this.tableCache = new TableCache<>();
}

public HTableInterface getHTableClient(TableId tableId) {
return getHTableCache()
.get(
tableId.identifier(),
() -> {
try {
OHTableClient tableClient =
new OHTableClient(
tableId.getTableName(),
getConfig(tableId.getSchemaName()));
tableClient.init();
return tableClient;
} catch (Exception e) {
throw new RuntimeException("Failed to initialize OHTableClient", e);
}
});
return tableCache.get(
tableId.identifier(),
() -> {
try {
OHTableClient tableClient =
new OHTableClient(
tableId.getTableName(), getConfig(tableId.getSchemaName()));
tableClient.init();
return tableClient;
} catch (Exception e) {
throw new RuntimeException("Failed to initialize OHTableClient", e);
}
});
}

private Configuration getConfig(String databaseName) {
Expand All @@ -89,9 +81,9 @@ private Configuration getConfig(String databaseName) {

@Override
public void close() throws Exception {
for (HTableInterface table : getHTableCache().getAll()) {
for (HTableInterface table : tableCache.getAll()) {
table.close();
}
getHTableCache().clear();
tableCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

public class OBKVHBaseRecordFlusher implements RecordFlusher {

private static final long serialVersionUID = 1L;

private final OBKVHBaseConnectorOptions options;
private final OBKVHBaseConnectionProvider connectionProvider;

Expand All @@ -56,17 +58,17 @@ public void flush(SchemaChangeRecord record) {

@Override
@SuppressWarnings("unchecked")
public void flush(List<DataChangeRecord> batch) throws Exception {
if (batch == null || batch.isEmpty()) {
public void flush(List<DataChangeRecord> records) throws Exception {
if (records == null || records.isEmpty()) {
return;
}

HTableInfo tableInfo = (HTableInfo) batch.get(0).getTable();
HTableInfo tableInfo = (HTableInfo) records.get(0).getTable();

Map<byte[], List<Put>> familyPutListMap = new HashMap<>();
Map<byte[], List<Delete>> familyDeleteListMap = new HashMap<>();

for (DataChangeRecord record : batch) {
for (DataChangeRecord record : records) {
byte[] rowKey = (byte[]) record.getFieldValue(tableInfo.getRowKeyName());
for (String familyName : tableInfo.getFamilyNames()) {
Object familyValue = record.getFieldValue(familyName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

public class HTableInfo implements Table {

private static final long serialVersionUID = 1L;

private final TableId tableId;
private final String rowKeyName;
private final LogicalType rowKeyType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,45 @@

package com.oceanbase.connector.flink.utils;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

public class TableCache<T> {
public class TableCache<T> implements Serializable {

private final Map<String, T> cache;
private static final long serialVersionUID = 1L;

public TableCache() {
cache = new ConcurrentHashMap<>();
private transient Map<String, T> cache;

private Map<String, T> getCache() {
if (cache == null) {
cache = new ConcurrentHashMap<>();
}
return cache;
}

public Collection<T> getAll() {
return cache.values();
return getCache().values();
}

public T get(String tableId, Supplier<T> supplier) {
if (cache.containsKey(tableId)) {
return cache.get(tableId);
if (getCache().containsKey(tableId)) {
return getCache().get(tableId);
}
T t = supplier.get();
cache.put(tableId, t);
getCache().put(tableId, t);
return t;
}

public void remove(String tableId) {
cache.remove(tableId);
getCache().remove(tableId);
}

public void clear() {
cache.clear();
if (cache != null) {
cache.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class OceanBaseRecordFlusher implements RecordFlusher {
private final OceanBaseConnectionProvider connectionProvider;
private final OceanBaseDialect dialect;

private transient TableCache<OceanBaseTablePartInfo> tablePartInfoCache;
private final TableCache<OceanBaseTablePartInfo> tablePartInfoCache;

private volatile long lastCheckMemStoreTime;

Expand All @@ -68,13 +68,7 @@ public OceanBaseRecordFlusher(
this.options = options;
this.connectionProvider = connectionProvider;
this.dialect = connectionProvider.getDialect();
}

private TableCache<OceanBaseTablePartInfo> getTablePartInfoCache() {
if (tablePartInfoCache == null) {
tablePartInfoCache = new TableCache<>();
}
return tablePartInfoCache;
this.tablePartInfoCache = new TableCache<>();
}

@Override
Expand All @@ -84,23 +78,23 @@ public synchronized void flush(SchemaChangeRecord record) throws Exception {
statement.execute(record.getSql());
}
if (record.shouldRefreshSchema()) {
getTablePartInfoCache().remove(record.getTableId().identifier());
tablePartInfoCache.remove(record.getTableId().identifier());
}
LOG.info("Flush SchemaChangeRecord successfully: {}", record);
}

@Override
public synchronized void flush(List<DataChangeRecord> batch) throws Exception {
if (batch == null || batch.isEmpty()) {
public synchronized void flush(List<DataChangeRecord> records) throws Exception {
if (records == null || records.isEmpty()) {
return;
}
checkMemStore();

TableInfo tableInfo = (TableInfo) batch.get(0).getTable();
TableInfo tableInfo = (TableInfo) records.get(0).getTable();
TableId tableId = tableInfo.getTableId();

if (CollectionUtils.isEmpty(tableInfo.getKey())) {
if (batch.stream().anyMatch(data -> !data.isUpsert())) {
if (records.stream().anyMatch(data -> !data.isUpsert())) {
throw new IllegalArgumentException(
"Table without PK must only contain insert records");
}
Expand All @@ -110,13 +104,13 @@ public synchronized void flush(List<DataChangeRecord> batch) throws Exception {
tableId.getTableName(),
tableInfo.getFieldNames()),
tableInfo.getFieldNames(),
batch);
records);
return;
}

List<DataChangeRecord> upsertBatch = new ArrayList<>();
List<DataChangeRecord> deleteBatch = new ArrayList<>();
batch.forEach(
records.forEach(
data -> {
if (data.isUpsert()) {
upsertBatch.add(data);
Expand Down Expand Up @@ -179,16 +173,16 @@ private boolean hasMemStoreReachedThreshold() throws SQLException {
}
}

private void flush(String sql, List<String> statementFields, List<DataChangeRecord> batch)
private void flush(String sql, List<String> statementFields, List<DataChangeRecord> records)
throws Exception {
Map<Long, List<DataChangeRecord>> group = groupRecords(batch);
Map<Long, List<DataChangeRecord>> group = groupRecords(records);
if (group == null) {
return;
}
try (Connection connection = connectionProvider.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
for (List<DataChangeRecord> groupBatch : group.values()) {
for (DataChangeRecord record : groupBatch) {
for (List<DataChangeRecord> groupRecords : group.values()) {
for (DataChangeRecord record : groupRecords) {
for (int i = 0; i < statementFields.size(); i++) {
statement.setObject(i + 1, record.getFieldValue(statementFields.get(i)));
}
Expand Down Expand Up @@ -228,10 +222,9 @@ private OceanBaseTablePartInfo getTablePartInfo(TableId tableId) {
if (!options.getPartitionEnabled()) {
return null;
}
return getTablePartInfoCache()
.get(
tableId.identifier(),
() -> queryTablePartInfo(tableId.getSchemaName(), tableId.getTableName()));
return tablePartInfoCache.get(
tableId.identifier(),
() -> queryTablePartInfo(tableId.getSchemaName(), tableId.getTableName()));
}

private OceanBaseTablePartInfo queryTablePartInfo(String schemaName, String tableName) {
Expand Down

0 comments on commit ca7de70

Please sign in to comment.