Skip to content

Commit

Permalink
feat: use OceanBaseConnectionInfo to maintain server information (#20)
Browse files Browse the repository at this point in the history
* add OceanBaseConnectionInfo

* fix typo
  • Loading branch information
whhe committed Apr 27, 2023
1 parent 8bf74be commit cc1cdda
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 122 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2023 OceanBase
* flink-connector-oceanbase is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http:https://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

package com.oceanbase.connector.flink.connection;

import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;

public class OceanBaseConnectionInfo implements Serializable {

private static final long serialVersionUID = 1L;

public enum CompatibleMode {
MYSQL,
ORACLE;

public static CompatibleMode fromString(String text) {
if (StringUtils.isBlank(text)) {
throw new IllegalArgumentException("Compatible mode should not be blank");
}
switch (text.trim().toUpperCase()) {
case "MYSQL":
return MYSQL;
case "ORACLE":
return ORACLE;
default:
throw new UnsupportedOperationException("Unsupported compatible mode: " + text);
}
}

public boolean isMySqlMode() {
return CompatibleMode.MYSQL.equals(this);
}
}

public enum Version {
LEGACY,
V4;

public static Version fromString(String text) {
return (StringUtils.isBlank(text) || !text.startsWith("4.")) ? LEGACY : V4;
}

public boolean isV4() {
return Version.V4.equals(this);
}
}

private final String clusterName;
private final String tenantName;
private final OceanBaseDialect dialect;
private final Version version;

public OceanBaseConnectionInfo(String username, OceanBaseDialect dialect, Version version) {
String clusterName = "", tenantName = "sys";
try {
if (username.contains("@")) {
int i = username.indexOf("@");
String s = username.substring(i + 1);
String[] arr = s.split("#");
if (arr.length > 0) {
tenantName = arr[0];
}
if (arr.length > 1) {
clusterName = arr[1];
}
} else {
String[] arr = username.split(":");
if (arr.length == 3) {
clusterName = arr[0];
tenantName = arr[1];
}
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse username", e);
}
this.clusterName = clusterName;
this.tenantName = tenantName;
this.dialect = dialect;
this.version = version;
}

public String getClusterName() {
return clusterName;
}

public String getTenantName() {
return tenantName;
}

public OceanBaseDialect getDialect() {
return dialect;
}

public Version getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
package com.oceanbase.connector.flink.connection;

import com.alibaba.druid.pool.DruidDataSource;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseOracleDialect;
import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
Expand All @@ -28,7 +31,7 @@ public class OceanBaseConnectionPool implements OceanBaseConnectionProvider, Ser
private final OceanBaseConnectionOptions options;
private DataSource dataSource;
private volatile boolean inited = false;
private OceanBaseCompatibleMode compatibleMode;
private OceanBaseConnectionInfo connectionInfo;

public OceanBaseConnectionPool(OceanBaseConnectionOptions options) {
this.options = options;
Expand Down Expand Up @@ -80,11 +83,24 @@ public Connection getConnection() throws SQLException {
}

@Override
public OceanBaseCompatibleMode getCompatibleMode() throws SQLException {
if (compatibleMode == null) {
compatibleMode = OceanBaseConnectionProvider.super.getCompatibleMode();
public OceanBaseConnectionInfo getConnectionInfo() {
if (connectionInfo == null) {
try {
OceanBaseConnectionInfo.CompatibleMode compatibleMode =
OceanBaseConnectionProvider.super.getCompatibleMode();
OceanBaseDialect dialect =
compatibleMode.isMySqlMode()
? new OceanBaseMySQLDialect()
: new OceanBaseOracleDialect();
OceanBaseConnectionInfo.Version version =
OceanBaseConnectionProvider.super.getVersion(dialect);
connectionInfo =
new OceanBaseConnectionInfo(options.getUsername(), dialect, version);
} catch (SQLException e) {
throw new RuntimeException("Failed to get connection info", e);
}
}
return compatibleMode;
return connectionInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@

package com.oceanbase.connector.flink.connection;

import com.oceanbase.connector.flink.dialect.OceanBaseDialect;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.sql.Statement;

public interface OceanBaseConnectionProvider extends AutoCloseable {
Expand All @@ -27,20 +30,50 @@ public interface OceanBaseConnectionProvider extends AutoCloseable {
*/
Connection getConnection() throws SQLException;

/**
* Get connection info
*
* @return connection info
*/
OceanBaseConnectionInfo getConnectionInfo();

/**
* Attempts to get the compatible mode of OceanBase
*
* @return compatible mode
* @throws SQLException if a database access error occurs
*/
default OceanBaseCompatibleMode getCompatibleMode() throws SQLException {
default OceanBaseConnectionInfo.CompatibleMode getCompatibleMode() throws SQLException {
try (Connection conn = getConnection();
Statement statement = conn.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'");
if (rs.next()) {
return OceanBaseCompatibleMode.fromString(rs.getString("Value"));
return OceanBaseConnectionInfo.CompatibleMode.fromString(rs.getString("Value"));
}
return null;
}
}

/**
* Attempts to get the version of OceanBase
*
* @return version
* @throws SQLException if a database access error occurs
*/
default OceanBaseConnectionInfo.Version getVersion(OceanBaseDialect dialect)
throws SQLException {
try (Connection conn = getConnection();
Statement statement = conn.createStatement()) {
ResultSet rs = statement.executeQuery(dialect.getSelectOBVersionStatement());
if (rs.next()) {
return OceanBaseConnectionInfo.Version.fromString(rs.getString(1));
}
return OceanBaseConnectionInfo.Version.LEGACY;
} catch (SQLSyntaxErrorException e) {
if (e.getMessage().contains("not exist")) {
return OceanBaseConnectionInfo.Version.LEGACY;
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,32 @@ default String getDeleteStatement(
.collect(Collectors.joining(" AND "));
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
}

/**
* Get the system database name
*
* @return the system database name
*/
String getSysDatabase();

default String getMemStoreExistStatement(double threshold) {
return "SELECT 1 FROM "
+ getSysDatabase()
+ ".GV$OB_MEMSTORE WHERE MEMSTORE_USED > MEMSTORE_LIMIT * "
+ threshold;
}

default String getLegacyMemStoreExistStatement(double threshold) {
return "SELECT 1 FROM "
+ getSysDatabase()
+ ".GV$MEMSTORE WHERE TOTAL > MEM_LIMIT * "
+ threshold;
}

/**
* Get the select statement for OB_VERSION() function
*
* @return the select statement for OB_VERSION() function
*/
String getSelectOBVersionStatement();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public String getUpsertStatement(
+ " ON DUPLICATE KEY UPDATE "
+ updateClause;
}

@Override
public String getSysDatabase() {
return "oceanbase";
}

@Override
public String getSelectOBVersionStatement() {
return "SELECT OB_VERSION()";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,14 @@ public String getUpsertStatement(
+ valuesClause
+ ")";
}

@Override
public String getSysDatabase() {
return "SYS";
}

@Override
public String getSelectOBVersionStatement() {
return "SELECT OB_VERSION() FROM DUAL";
}
}
Loading

0 comments on commit cc1cdda

Please sign in to comment.