diff --git a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java index da5dcf9..8d0807a 100644 --- a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java +++ b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java @@ -43,12 +43,22 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,30 +66,39 @@ public class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase { - public static final String CLUSTER_NAME = "obcluster"; - public static final String CONFIG_URL = - "http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME; - - @Override - protected String getTestTable() { - return "htable"; - } - - @Override - protected String getUrl() { - return CONFIG_URL; - } - - @Override - protected String getUsername() { - return OB_SERVER.getUsername() + "#" + CLUSTER_NAME; + private static final Logger LOG = LoggerFactory.getLogger(OBKVHBaseConnectorITCase.class); + + @ClassRule public static final GenericContainer CONTAINER = container("sql/init.sql"); + + private static final String TEST_TABLE = "htable"; + + protected String getConfigUrl() { + try (Connection connection = + DriverManager.getConnection( + getJdbcUrl(CONTAINER), SYS_USERNAME, SYS_PASSWORD); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'obconfig_url'"); + String configUrl = rs.next() ? rs.getString("VALUE") : null; + if (configUrl == null || configUrl.isEmpty()) { + throw new RuntimeException("obconfig_url not found"); + } + LOG.info("Got obconfig_url: {}", configUrl); + return configUrl; + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Map getOptions() { - Map options = super.getOptions(); - options.put("sys.username", OB_SERVER.getSysUsername()); - options.put("sys.password", OB_SERVER.getSysPassword()); + Map options = new HashMap<>(); + options.put("url", getConfigUrl()); + options.put("sys.username", SYS_USERNAME); + options.put("sys.password", SYS_PASSWORD); + options.put("username", TEST_USERNAME + "#" + CLUSTER_NAME); + options.put("password", TEST_PASSWORD); + options.put("schema-name", TEST_DATABASE); + options.put("table-name", TEST_TABLE); return options; } @@ -95,6 +114,9 @@ public void before() throws Exception { @After public void after() throws Exception { + if (client == null) { + return; + } client.delete( Arrays.asList( deleteFamily("1", "family1"), diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java deleted file mode 100644 index c68b630..0000000 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2024 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.connector.flink; - -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -import java.time.Duration; - -public class OceanBaseContainer extends JdbcDatabaseContainer { - - public static final String DOCKER_IMAGE_NAME = "oceanbase/oceanbase-ce"; - - private static final DockerImageName DEFAULT_IMAGE_NAME = - DockerImageName.parse(DOCKER_IMAGE_NAME); - - private static final Integer SQL_PORT = 2881; - private static final Integer RPC_PORT = 2882; - - private static final String DEFAULT_USERNAME = "root"; - private static final String DEFAULT_PASSWORD = ""; - private static final String DEFAULT_TENANT_NAME = "test"; - private static final String DEFAULT_DATABASE_NAME = "test"; - - private String sysPassword = DEFAULT_PASSWORD; - - public OceanBaseContainer(String dockerImageName) { - this(DockerImageName.parse(dockerImageName)); - } - - public OceanBaseContainer(DockerImageName dockerImageName) { - super(dockerImageName); - dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); - - this.waitStrategy = - Wait.forLogMessage(".*boot success!.*", 1) - .withStartupTimeout(Duration.ofMinutes(5)); - - addExposedPorts(SQL_PORT, RPC_PORT); - } - - @Override - public String getDriverClassName() { - return "com.oceanbase.jdbc.Driver"; - } - - public Integer getSqlPort() { - return getActualPort(SQL_PORT); - } - - public Integer getActualPort(int port) { - return "host".equals(getNetworkMode()) ? port : getMappedPort(port); - } - - @Override - public String getJdbcUrl() { - return getJdbcUrl(DEFAULT_DATABASE_NAME); - } - - public String getJdbcUrl(String databaseName) { - String additionalUrlParams = constructUrlParameters("?", "&"); - return "jdbc:mysql://" - + getHost() - + ":" - + getSqlPort() - + "/" - + databaseName - + additionalUrlParams; - } - - public OceanBaseContainer withSysPassword(String sysPassword) { - this.sysPassword = sysPassword; - return this; - } - - public String getSysUsername() { - return DEFAULT_USERNAME; - } - - public String getSysPassword() { - return sysPassword; - } - - @Override - public String getDatabaseName() { - return DEFAULT_DATABASE_NAME; - } - - @Override - public String getUsername() { - return DEFAULT_USERNAME + "@" + DEFAULT_TENANT_NAME; - } - - @Override - public String getPassword() { - return DEFAULT_PASSWORD; - } - - @Override - protected String getTestQueryString() { - return "SELECT 1"; - } - - @Override - protected void waitUntilContainerStarted() { - getWaitStrategy().waitUntilReady(this); - } - - @Override - protected void configure() { - withEnv("MODE", "slim"); - withEnv("OB_ROOT_PASSWORD", sysPassword); - } -} diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java index 7ba8700..ffc453f 100644 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java +++ b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java @@ -18,13 +18,15 @@ import org.apache.flink.util.TestLogger; -import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.MountableFile; -import java.util.HashMap; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -37,44 +39,49 @@ public abstract class OceanBaseMySQLTestBase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLTestBase.class); - public static final String IMAGE_TAG = "4.2.1_bp2"; - - @ClassRule - public static final OceanBaseContainer OB_SERVER = - new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + IMAGE_TAG) - .withNetworkMode("host") - .withSysPassword("123456") - .withCopyFileToContainer( - MountableFile.forClasspathResource("sql/init.sql"), - "/root/boot/init.d/init.sql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - protected String getUrl() { - return OB_SERVER.getJdbcUrl(); - } - - protected abstract String getTestTable(); - - protected String getUsername() { - return OB_SERVER.getUsername(); + protected static final Network NETWORK = Network.newNetwork(); + + public static final int SQL_PORT = 2881; + public static final int RPC_PORT = 2882; + public static final int CONFIG_SERVER_PORT = 8080; + + public static final String CLUSTER_NAME = "github-action"; + public static final String SYS_USERNAME = "root"; + public static final String SYS_PASSWORD = "123456"; + public static final String TEST_TENANT = "flink"; + public static final String TEST_USERNAME = "root@" + TEST_TENANT; + public static final String TEST_PASSWORD = "654321"; + public static final String TEST_DATABASE = "test"; + + @SuppressWarnings("resource") + protected static GenericContainer container(String initSqlFile) { + GenericContainer container = + new GenericContainer<>("oceanbase/oceanbase-ce") + .withNetwork(NETWORK) + .withExposedPorts(SQL_PORT, RPC_PORT, CONFIG_SERVER_PORT) + .withEnv("MODE", "mini") + .withEnv("OB_CLUSTER_NAME", CLUSTER_NAME) + .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD) + .withEnv("OB_TENANT_NAME", TEST_TENANT) + .withEnv("OB_TENANT_PASSWORD", TEST_PASSWORD) + .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) + .withStartupTimeout(Duration.ofMinutes(4)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + if (initSqlFile != null) { + container.withCopyFileToContainer( + MountableFile.forClasspathResource(initSqlFile), "/root/boot/init.d/init.sql"); + } + return container; } - protected String getPassword() { - return OB_SERVER.getPassword(); - } + protected abstract Map getOptions(); - protected String getDatabaseName() { - return OB_SERVER.getDatabaseName(); + protected String getJdbcUrl(GenericContainer container) { + return getJdbcUrl(container.getHost(), container.getMappedPort(SQL_PORT)); } - protected Map getOptions() { - Map options = new HashMap<>(); - options.put("url", getUrl()); - options.put("username", getUsername()); - options.put("password", getPassword()); - options.put("schema-name", getDatabaseName()); - options.put("table-name", getTestTable()); - return options; + protected String getJdbcUrl(String host, int port) { + return "jdbc:mysql://" + host + ":" + port + "/" + TEST_DATABASE + "?useSSL=false"; } protected String getOptionsString() { diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java index 912e916..5bdc538 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java @@ -46,7 +46,9 @@ import org.apache.commons.collections.CollectionUtils; import org.junit.After; import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; +import org.testcontainers.containers.GenericContainer; import java.math.BigDecimal; import java.sql.Connection; @@ -58,6 +60,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,23 +69,32 @@ public class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { - @Override - protected String getTestTable() { - return "products"; - } + @ClassRule public static final GenericContainer CONTAINER = container("sql/init.sql"); + + private static final String TEST_TABLE = "products"; @Override protected Map getOptions() { - Map options = super.getOptions(); + Map options = new HashMap<>(); + options.put("url", getJdbcUrl(CONTAINER)); + options.put("username", TEST_USERNAME); + options.put("password", TEST_PASSWORD); + options.put("schema-name", TEST_DATABASE); + options.put("table-name", TEST_TABLE); options.put("druid-properties", "druid.initialSize=4;druid.maxActive=20;"); return options; } + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(getJdbcUrl(CONTAINER), TEST_USERNAME, TEST_PASSWORD); + } + @After public void after() throws Exception { try (Connection connection = getConnection(); Statement statement = connection.createStatement()) { - statement.execute("DELETE FROM " + getTestTable()); + statement.execute("DELETE FROM products"); + statement.execute("DELETE FROM gis_types"); } } @@ -101,10 +113,10 @@ public void testMultipleTableSink() throws Exception { new OceanBaseRecordFlusher(connectorOptions)); OceanBaseDialect dialect = new OceanBaseMySQLDialect(); - String database = getDatabaseName(); - String tableA = getTestTable() + "A"; - String tableB = getTestTable() + "B"; - String tableC = getTestTable() + "C"; + String database = TEST_DATABASE; + String tableA = TEST_TABLE + "A"; + String tableB = TEST_TABLE + "B"; + String tableC = TEST_TABLE + "C"; String tableFullNameA = dialect.getFullTableName(database, tableA); String tableFullNameB = dialect.getFullTableName(database, tableB); @@ -397,10 +409,10 @@ public void testDirectLoadSink() throws Exception { + " 'connector'='oceanbase'," + " 'direct-load.enabled'='true'," + " 'direct-load.host'='" - + OB_SERVER.getHost() + + CONTAINER.getHost() + "'," + " 'direct-load.port'='" - + OB_SERVER.getActualPort(2882) + + CONTAINER.getMappedPort(RPC_PORT) + "'," + getOptionsString() + ");"); @@ -470,9 +482,9 @@ private void validateSinkResults() throws SQLException, InterruptedException { "108,jacket,water resistent black wind breaker,0.1000000000", "109,spare tire,24 inch spare tire,22.2000000000"); - waitingAndAssertTableCount(getTestTable(), expected.size()); + waitingAndAssertTableCount(TEST_TABLE, expected.size()); - List actual = queryTable(getTestTable()); + List actual = queryTable(TEST_TABLE); assertEqualsInAnyOrder(expected, actual); } @@ -515,8 +527,4 @@ public List queryTable(String tableName, List fields) throws SQL } return result; } - - protected Connection getConnection() throws SQLException { - return DriverManager.getConnection(getUrl(), getUsername(), getPassword()); - } } diff --git a/pom.xml b/pom.xml index a5056a1..bcbb01f 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ under the License. org.testcontainers testcontainers-bom - 1.15.3 + 1.18.3 pom import