Skip to content

Commit

Permalink
fix: use transient fields for tablePartInfo and ExecutorService (#36)
Browse files Browse the repository at this point in the history
* fix: use transient fields for tablePartInfo and executor service

* ignore docs and pom files in pre hook scan
  • Loading branch information
whhe committed Aug 19, 2023
1 parent 164ef39 commit 3333754
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 61 deletions.
4 changes: 4 additions & 0 deletions .scanignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# git pre hook scan ignore configuration

*.md
pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;

import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionOptions;
import com.oceanbase.connector.flink.sink.OBKVHBaseStatementOptions;
Expand All @@ -35,9 +34,6 @@ public OBKVHBaseConnectorOptions(Map<String, String> config) {
super(config);
}

@Override
protected void validate(ReadableConfig config) {}

public OBKVHBaseConnectionOptions getConnectionOptions() {
return new OBKVHBaseConnectionOptions(
allConfig.get(URL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ public abstract class AbstractOceanBaseConnectorOptions implements Serializable

protected final ReadableConfig allConfig;

protected abstract void validate(ReadableConfig config);
protected void validateConfig() {}

public AbstractOceanBaseConnectorOptions(Map<String, String> config) {
ReadableConfig readableConfig = Configuration.fromMap(config);
validate(readableConfig);
this.allConfig = readableConfig;
this.allConfig = Configuration.fromMap(config);
validateConfig();
}

public OceanBaseWriterOptions getWriterOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;

import com.oceanbase.connector.flink.connection.OceanBaseConnectionOptions;
import com.oceanbase.connector.flink.sink.OceanBaseStatementOptions;
Expand Down Expand Up @@ -112,7 +111,7 @@ public OceanBaseConnectorOptions(Map<String, String> config) {
}

@Override
protected void validate(ReadableConfig config) {
protected void validateConfig() {
if (allConfig.get(PARTITION_ENABLED)
&& (allConfig.get(CLUSTER_NAME) == null || allConfig.get(TENANT_NAME) == null)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

import javax.annotation.Nonnull;

import java.io.Serializable;

public class OceanBaseConnectionInfo implements Serializable {

private static final long serialVersionUID = 1L;
public class OceanBaseConnectionInfo {

public enum CompatibleMode {
MYSQL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,20 @@

import javax.sql.DataSource;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;

public class OceanBaseConnectionPool implements OceanBaseConnectionProvider, Serializable {
public class OceanBaseConnectionPool implements OceanBaseConnectionProvider {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseConnectionPool.class);

private static final long serialVersionUID = 1L;

private final OceanBaseConnectionOptions options;
private DataSource dataSource;
private volatile boolean inited = false;
private OceanBaseConnectionInfo connectionInfo;
private OceanBaseTablePartInfo tablePartitionInfo;
private transient volatile boolean inited = false;
private transient OceanBaseConnectionInfo connectionInfo;
private transient DataSource dataSource;
private transient OceanBaseTablePartInfo tablePartitionInfo;

public OceanBaseConnectionPool(OceanBaseConnectionOptions options) {
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public interface OceanBaseConnectionProvider extends AutoCloseable {
public interface OceanBaseConnectionProvider extends AutoCloseable, Serializable {

/**
* Attempts to establish a connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.partition.metadata.desc.ObTablePart;

import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Stream;

public class OceanBaseTablePartInfo implements Serializable {

private static final long serialVersionUID = 1L;
public class OceanBaseTablePartInfo {

private final ObPartIdCalculator partIdCalculator;
private final Map<String, Integer> partColumnIndexMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.oceanbase.connector.flink.connection.OceanBaseTablePartInfo;
import com.oceanbase.connector.flink.connection.OceanBaseTableSchema;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,15 +56,14 @@ public class OceanBaseStatementExecutor implements StatementExecutor<RowData> {
private final String deleteStatementSql;
private final String upsertStatementSql;
private final String queryMemStoreSql;

private final OceanBaseTablePartInfo tablePartInfo;
private final List<Integer> partColumnIndexes;
private final ExecutorService statementExecutorService;

private final List<RowData> buffer = new ArrayList<>();
private final Map<String, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();

private transient volatile boolean closed = false;
private transient OceanBaseTablePartInfo tablePartInfo;
private transient ExecutorService statementExecutorService;
private volatile long lastCheckMemStoreTime;
private volatile SQLException statementException;
private SinkWriterMetricGroup metricGroup;
Expand Down Expand Up @@ -103,42 +103,42 @@ public OceanBaseStatementExecutor(
connectionInfo.getVersion().isV4()
? dialect.getMemStoreExistStatement(options.getMemStoreThreshold())
: dialect.getLegacyMemStoreExistStatement(options.getMemStoreThreshold());
this.tablePartInfo =
options.isPartitionEnabled() ? connectionProvider.getTablePartInfo() : null;
if (this.tablePartInfo != null && !this.tablePartInfo.getPartColumnIndexMap().isEmpty()) {
if (getTablePartInfo() != null && !getTablePartInfo().getPartColumnIndexMap().isEmpty()) {
LOG.info(
"Table partition info loaded, part columns: {}",
tablePartInfo.getPartColumnIndexMap().keySet());
getTablePartInfo().getPartColumnIndexMap().keySet());

this.partColumnIndexes =
IntStream.range(0, tableSchema.getFieldNames().size())
.filter(
i ->
tablePartInfo
getTablePartInfo()
.getPartColumnIndexMap()
.containsKey(
tableSchema.getFieldNames().get(i)))
.boxed()
.collect(Collectors.toList());

if (options.getPartitionNumber() > 1) {
this.statementExecutorService =
Executors.newFixedThreadPool(options.getPartitionNumber());
LOG.info(
"ExecutorService set with {} threads, will flush records by partitions in parallel",
options.getPartitionNumber());
} else {
this.statementExecutorService = null;
LOG.info(
"ExecutorService not set, will flush records by partitions in main thread");
}
} else {
this.partColumnIndexes = null;
this.statementExecutorService = null;
LOG.info("No table partition info loaded, will flush records directly");
}
}

private OceanBaseTablePartInfo getTablePartInfo() {
if (options.isPartitionEnabled() && tablePartInfo == null) {
tablePartInfo = connectionProvider.getTablePartInfo();
}
return tablePartInfo;
}

private ExecutorService getStatementExecutorService() {
if (CollectionUtils.isNotEmpty(partColumnIndexes)
&& options.getPartitionNumber() > 1
&& statementExecutorService == null) {
statementExecutorService = Executors.newFixedThreadPool(options.getPartitionNumber());
}
return statementExecutorService;
}

@Override
public void setContext(Sink.InitContext context) {
this.metricGroup = context.metricGroup();
Expand Down Expand Up @@ -315,27 +315,28 @@ private void executeBatch(
Object[] record = new Object[tableSchema.getFieldNames().size()];
for (Integer i : partColumnIndexes) {
Integer columnIndex =
tablePartInfo
getTablePartInfo()
.getPartColumnIndexMap()
.get(tableSchema.getFieldNames().get(i));
record[columnIndex] = tableSchema.getFieldGetters()[i].getFieldOrNull(rowData);
}
Long partId = tablePartInfo.getPartIdCalculator().calculatePartId(record);
Long partId = getTablePartInfo().getPartIdCalculator().calculatePartId(record);
partRowDataMap.computeIfAbsent(partId, k -> new ArrayList<>()).add(rowData);
}
if (statementExecutorService != null) {
if (getStatementExecutorService() != null) {
CountDownLatch latch = new CountDownLatch(partRowDataMap.size());
for (List<RowData> partRowDataList : partRowDataMap.values()) {
statementExecutorService.execute(
() -> {
try {
execute(sql, partRowDataList, fieldGetters);
} catch (SQLException e) {
statementException = e;
} finally {
latch.countDown();
}
});
getStatementExecutorService()
.execute(
() -> {
try {
execute(sql, partRowDataList, fieldGetters);
} catch (SQLException e) {
statementException = e;
} finally {
latch.countDown();
}
});
}
try {
latch.await();
Expand Down

0 comments on commit 3333754

Please sign in to comment.