Skip to content

Commit

Permalink
query compatibleMode, cluster, tenant by jdbc connection
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Feb 4, 2024
1 parent 294bc27 commit 4b9ab33
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink.connection;

import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nonnull;

import java.io.Serializable;

public class OceanBaseUserInfo implements Serializable {

private static final long serialVersionUID = 1L;

private String cluster;
private String tenant;
private final String user;

public OceanBaseUserInfo(String cluster, String tenant, String user) {
this.cluster = cluster;
this.tenant = tenant;
this.user = user;
}

public String getCluster() {
return cluster;
}

public String getTenant() {
return tenant;
}

public String getUser() {
return user;
}

public void setCluster(@Nonnull String cluster) {
this.cluster = cluster;
}

public void setTenant(@Nonnull String tenant) {
this.tenant = tenant;
}

public static OceanBaseUserInfo parse(String username) {
final String sepUserAtTenant = "@";
final String sepTenantAtCluster = "#";
final String sep = ":";
final int expectedSepCount = 2;
if (username.contains(sepTenantAtCluster) && username.contains(sepUserAtTenant)) {
String[] parts = username.split(sepTenantAtCluster);
String[] userAndTenant = parts[0].split(sepUserAtTenant);
return new OceanBaseUserInfo(parts[1], userAndTenant[1], userAndTenant[0]);
} else if (StringUtils.countMatches(username, sep) == expectedSepCount) {
String[] parts = username.split(sep);
return new OceanBaseUserInfo(parts[0], parts[1], parts[2]);
} else if (username.contains(sepUserAtTenant) && !username.contains(sepTenantAtCluster)) {
String[] parts = username.split(sepUserAtTenant);
return new OceanBaseUserInfo(null, parts[1], parts[0]);
} else {
return new OceanBaseUserInfo(null, null, username);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,16 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;

public class OceanBaseConnectorOptions extends ConnectorOptions {
private static final long serialVersionUID = 1L;

public static final ConfigOption<String> COMPATIBLE_MODE =
ConfigOptions.key("compatible-mode")
.stringType()
.defaultValue("mysql")
.withDescription(
"The compatible mode of OceanBase, can be 'mysql' or 'oracle', use 'mysql' by default.");

public static final ConfigOption<String> DRIVER_CLASS_NAME =
ConfigOptions.key("driver-class-name")
.stringType()
.defaultValue("com.mysql.cj.jdbc.Driver")
.withDescription(
"JDBC driver class name, use 'com.mysql.cj.jdbc.Driver' by default.");

public static final ConfigOption<String> CLUSTER_NAME =
ConfigOptions.key("cluster-name")
.stringType()
.noDefaultValue()
.withDescription("The cluster name.");

public static final ConfigOption<String> TENANT_NAME =
ConfigOptions.key("tenant-name")
.stringType()
.noDefaultValue()
.withDescription("The tenant name.");

public static final ConfigOption<String> DRUID_PROPERTIES =
ConfigOptions.key("druid-properties")
.stringType()
Expand Down Expand Up @@ -109,12 +88,6 @@ public class OceanBaseConnectorOptions extends ConnectorOptions {
.defaultValue(2882)
.withDescription("Rpc port number used in direct load.");

public static final ConfigOption<String> DIRECT_LOAD_USERNAME =
ConfigOptions.key("direct-load.username")
.stringType()
.noDefaultValue()
.withDescription("Username used in direct load.");

public static final ConfigOption<Integer> DIRECT_LOAD_PARALLEL =
ConfigOptions.key("direct-load.parallel")
.intType()
Expand Down Expand Up @@ -147,60 +120,12 @@ public class OceanBaseConnectorOptions extends ConnectorOptions {

public OceanBaseConnectorOptions(Map<String, String> config) {
super(config);
if (getUrl().contains("mysql")) {
if (!getDriverClassName().contains("mysql")) {
throw new IllegalArgumentException(
"Wrong 'driver-class-name', should use mysql driver for url: " + getUrl());
}
if (!getCompatibleMode().equalsIgnoreCase("mysql")) {
throw new IllegalArgumentException(
"Wrong 'compatible-mode', the mysql driver can only be used on 'mysql' mode.");
}
} else if (getUrl().contains("oceanbase")) {
if (!getDriverClassName().contains("oceanbase")) {
throw new IllegalArgumentException(
"Wrong 'driver-class-name', should use oceanbase driver for url: "
+ getUrl());
}
}

if (getPartitionEnabled()) {
checkNotNull(
getClusterName(),
"'cluster-name' is required when 'partition.enabled' is true.");
checkNotNull(
getTenantName(), "'tenant-name' is required when 'partition.enabled' is true.");
}

if (getDirectLoadEnabled()) {
checkNotNull(
getDirectLoadHost(),
"'direct-load.host' is required when 'direct-load.enabled' is true.");
checkNotNull(
getDirectLoadUsername(),
"'direct-load.username' is required when 'direct-load.enabled' is true.");
checkNotNull(
getTenantName(),
"'tenant-name' is required when 'direct-load.enabled' is true.");
}
}

public String getCompatibleMode() {
return allConfig.get(COMPATIBLE_MODE);
}

public String getDriverClassName() {
return allConfig.get(DRIVER_CLASS_NAME);
}

public String getClusterName() {
return allConfig.get(CLUSTER_NAME);
}

public String getTenantName() {
return allConfig.get(TENANT_NAME);
}

public Properties getDruidProperties() {
return OptionUtils.parseProperties(allConfig.get(DRUID_PROPERTIES));
}
Expand Down Expand Up @@ -233,10 +158,6 @@ public int getDirectLoadPort() {
return allConfig.get(DIRECT_LOAD_PORT);
}

public String getDirectLoadUsername() {
return allConfig.get(DIRECT_LOAD_USERNAME);
}

public int getDirectLoadParallel() {
return allConfig.get(DIRECT_LOAD_PARALLEL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(OceanBaseConnectorOptions.BUFFER_FLUSH_INTERVAL);
options.add(OceanBaseConnectorOptions.BUFFER_SIZE);
options.add(OceanBaseConnectorOptions.MAX_RETRIES);
options.add(OceanBaseConnectorOptions.COMPATIBLE_MODE);
options.add(OceanBaseConnectorOptions.DRIVER_CLASS_NAME);
options.add(OceanBaseConnectorOptions.CLUSTER_NAME);
options.add(OceanBaseConnectorOptions.TENANT_NAME);
options.add(OceanBaseConnectorOptions.DRUID_PROPERTIES);
options.add(OceanBaseConnectorOptions.MEMSTORE_CHECK_ENABLED);
options.add(OceanBaseConnectorOptions.MEMSTORE_THRESHOLD);
Expand All @@ -89,7 +86,6 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_ENABLED);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_HOST);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_PORT);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_USERNAME);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_PARALLEL);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_MAX_ERROR_ROWS);
options.add(OceanBaseConnectorOptions.DIRECT_LOAD_DUP_ACTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseOracleDialect;
import com.oceanbase.connector.flink.table.TableId;
import com.oceanbase.connector.flink.utils.OceanBaseJdbcUtils;

import com.alibaba.druid.pool.DruidDataSource;
import com.alipay.oceanbase.rpc.table.ObDirectLoadParameter;
Expand All @@ -34,9 +35,7 @@
import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class OceanBaseConnectionProvider implements ConnectionProvider {
Expand Down Expand Up @@ -88,31 +87,64 @@ public String toString() {
}

private final OceanBaseConnectorOptions options;
private final CompatibleMode compatibleMode;
private final OceanBaseDialect dialect;

private CompatibleMode compatibleMode;
private OceanBaseDialect dialect;
private Version version;
private OceanBaseUserInfo userInfo;

private transient volatile boolean inited = false;
private transient DataSource dataSource;

public OceanBaseConnectionProvider(OceanBaseConnectorOptions options) {
this.options = options;
this.compatibleMode = CompatibleMode.parse(options.getCompatibleMode());
this.dialect =
compatibleMode.isMySqlMode()
? new OceanBaseMySQLDialect()
: new OceanBaseOracleDialect();
}

private CompatibleMode getCompatibleMode() {
if (compatibleMode == null) {
compatibleMode =
CompatibleMode.parse(OceanBaseJdbcUtils.getCompatibleMode(this::getConnection));
}
return compatibleMode;
}

public boolean isMySqlMode() {
return compatibleMode.isMySqlMode();
return getCompatibleMode().isMySqlMode();
}

public OceanBaseDialect getDialect() {
if (dialect == null) {
dialect =
getCompatibleMode().isMySqlMode()
? new OceanBaseMySQLDialect()
: new OceanBaseOracleDialect();
}
return dialect;
}

public Version getVersion() {
if (version == null) {
String versionText = OceanBaseJdbcUtils.getVersionNumber(this::getConnection);
LOG.info("Got OceanBase version number: {}", versionText);
version = new Version(versionText);
}
return version;
}

public OceanBaseUserInfo getUserInfo() {
if (userInfo == null) {
OceanBaseUserInfo user = OceanBaseUserInfo.parse(options.getUsername());
if (user.getCluster() == null) {
user.setCluster(OceanBaseJdbcUtils.getClusterName(this::getConnection));
}
if (user.getTenant() == null) {
user.setTenant(OceanBaseJdbcUtils.getTenantName(this::getConnection, getDialect()));
}
userInfo = user;
}
return userInfo;
}

protected void init() {
if (!inited) {
synchronized (this) {
Expand All @@ -138,58 +170,26 @@ public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

public Version getVersion() {
init();
if (version == null) {
try {
String versionText = queryVersion();
LOG.info("Got OceanBase version number: {}", versionText);
version = new Version(versionText);
} catch (SQLException e) {
throw new RuntimeException("Failed to query version of OceanBase", e);
}
public ObTableDirectLoad getDirectLoad(TableId tableId) {
int count = OceanBaseJdbcUtils.getTableRowsCount(this::getConnection, tableId.identifier());
if (count != 0) {
throw new RuntimeException(
"Direct load can only work on empty table, while table "
+ tableId.identifier()
+ " has "
+ count
+ " rows");
}
return version;
}

private String queryVersion() throws SQLException {
try (Connection conn = getConnection();
Statement statement = conn.createStatement()) {
try {
ResultSet rs = statement.executeQuery(dialect.getSelectOBVersionStatement());
if (rs.next()) {
return rs.getString(1);
}
} catch (SQLException e) {
if (!e.getMessage().contains("not exist")) {
throw e;
}
}

ResultSet rs = statement.executeQuery(dialect.getQueryVersionCommentStatement());
if (rs.next()) {
String versionComment = rs.getString("VALUE");
String[] parts = StringUtils.split(versionComment, " ");
if (parts != null && parts.length > 1) {
return parts[1];
}
throw new RuntimeException("Illegal 'version_comment': " + versionComment);
}
throw new RuntimeException("'version_comment' not found");
}
}

public ObTableDirectLoad getDirectLoadTable(TableId tableId) {
ObTable table = getTable(tableId.getSchemaName());
ObTable table = getDirectLoadTable(tableId.getSchemaName());
return new ObTableDirectLoad(table, tableId.getTableName(), getDirectLoadParameter(), true);
}

private ObTable getTable(String schemaName) {
private ObTable getDirectLoadTable(String schemaName) {
try {
return new ObTable.Builder(options.getDirectLoadHost(), options.getDirectLoadPort())
.setLoginInfo(
options.getTenantName(),
options.getDirectLoadUsername(),
getUserInfo().getTenant(),
getUserInfo().getUser(),
options.getPassword(),
schemaName)
.build();
Expand Down
Loading

0 comments on commit 4b9ab33

Please sign in to comment.