Skip to content

Commit

Permalink
refactor: use compatible mode enum (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Apr 26, 2023
1 parent c618300 commit 8bf74be
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2023 OceanBase
* flink-connector-oceanbase is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http:https://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

package com.oceanbase.connector.flink.connection;

import org.apache.commons.lang3.StringUtils;

public enum OceanBaseCompatibleMode {
MYSQL,
ORACLE;

public static OceanBaseCompatibleMode fromString(String text) {
if (StringUtils.isBlank(text)) {
throw new IllegalArgumentException("Compatible mode should not be blank");
}
switch (text.trim().toUpperCase()) {
case "MYSQL":
return MYSQL;
case "ORACLE":
return ORACLE;
default:
throw new UnsupportedOperationException("Unsupported compatible mode: " + text);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import com.alibaba.druid.pool.DruidDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang3.StringUtils;

import javax.sql.DataSource;

Expand All @@ -29,7 +28,7 @@ public class OceanBaseConnectionPool implements OceanBaseConnectionProvider, Ser
private final OceanBaseConnectionOptions options;
private DataSource dataSource;
private volatile boolean inited = false;
private String compatibleMode;
private OceanBaseCompatibleMode compatibleMode;

public OceanBaseConnectionPool(OceanBaseConnectionOptions options) {
this.options = options;
Expand Down Expand Up @@ -81,8 +80,8 @@ public Connection getConnection() throws SQLException {
}

@Override
public String getCompatibleMode() throws SQLException {
if (StringUtils.isBlank(compatibleMode)) {
public OceanBaseCompatibleMode getCompatibleMode() throws SQLException {
if (compatibleMode == null) {
compatibleMode = OceanBaseConnectionProvider.super.getCompatibleMode();
}
return compatibleMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public interface OceanBaseConnectionProvider extends AutoCloseable {
* @return compatible mode
* @throws SQLException if a database access error occurs
*/
default String getCompatibleMode() throws SQLException {
default OceanBaseCompatibleMode getCompatibleMode() throws SQLException {
try (Connection conn = getConnection();
Statement statement = conn.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'");
if (rs.next()) {
return rs.getString("Value");
return OceanBaseCompatibleMode.fromString(rs.getString("Value"));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

import com.oceanbase.connector.flink.connection.OceanBaseCompatibleMode;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import com.oceanbase.connector.flink.table.OceanBaseTableSchema;
Expand Down Expand Up @@ -93,9 +94,9 @@ public OceanBaseRowDataStatementExecutor(
}

private void attemptQueryMemStore() throws SQLException {
String compatibleMode = connectionProvider.getCompatibleMode();
OceanBaseCompatibleMode compatibleMode = connectionProvider.getCompatibleMode();
String view, legacyView;
if ("mysql".equalsIgnoreCase(compatibleMode)) {
if (OceanBaseCompatibleMode.MYSQL.equals(compatibleMode)) {
view = "oceanbase.GV$OB_MEMSTORE";
legacyView = "oceanbase.gv$memstore";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.table.data.RowData;

import com.oceanbase.connector.flink.connection.OceanBaseCompatibleMode;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseOracleDialect;
import com.oceanbase.connector.flink.table.OceanBaseTableSchema;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.sql.SQLException;
Expand Down Expand Up @@ -59,25 +59,15 @@ public SinkWriter<RowData> createWriter(InitContext context) throws IOException

private OceanBaseDialect getDialect(OceanBaseConnectionProvider connectionProvider)
throws IOException {
String compatibleMode;
OceanBaseCompatibleMode compatibleMode;
try {
compatibleMode = connectionProvider.getCompatibleMode();
} catch (SQLException e) {
throw new IOException("Failed to get compatible mode", e);
}

if (StringUtils.isBlank(compatibleMode)) {
throw new RuntimeException("Got empty 'ob_compatibility_mode'");
}

switch (compatibleMode.toLowerCase()) {
case "mysql":
return new OceanBaseMySQLDialect();
case "oracle":
return new OceanBaseOracleDialect();
default:
throw new UnsupportedOperationException(
"Unsupported compatible mode: " + compatibleMode);
if (OceanBaseCompatibleMode.MYSQL.equals(compatibleMode)) {
return new OceanBaseMySQLDialect();
}
return new OceanBaseOracleDialect();
}
}

0 comments on commit 8bf74be

Please sign in to comment.