Skip to content

Commit

Permalink
[FLINK-16810][jdbc] add back PostgresCatalogITCase
Browse files Browse the repository at this point in the history
add back PostgresCatalogITCase which is supposed to be part of apache#11468
  • Loading branch information
bowenli86 committed Mar 26, 2020
1 parent 6a08051 commit 8cf8d8d
Showing 1 changed file with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.flink.api.java.io.jdbc.catalog;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.types.Row;

import org.junit.Test;

import java.util.List;

import static org.apache.flink.api.java.io.jdbc.catalog.PostgresCatalog.DEFAULT_DATABASE;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.junit.Assert.assertEquals;

/**
* E2E test for {@link PostgresCatalog}.
*/
public class PostgresCatalogITCase extends PostgresCatalogTestBase {

@Test
public void test_withoutSchema() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();

List<Row> results = TableUtils.collectToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE1)));
assertEquals("[1]", results.toString());
}

@Test
public void test_withSchema() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();

List<Row> results = TableUtils.collectToList(
tEnv.sqlQuery(String.format("select * from `%s`", PostgresTablePath.fromFlinkTableName(TABLE1))));
assertEquals("[1]", results.toString());
}

@Test
public void test_fullPath() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();

List<Row> results = TableUtils.collectToList(
tEnv.sqlQuery(String.format("select * from %s.%s.`%s`",
TEST_CATALOG_NAME,
DEFAULT_DATABASE,
PostgresTablePath.fromFlinkTableName(TABLE1))));
assertEquals("[1]", results.toString());
}

@Test
public void test_insert() throws Exception {
TableEnvironment tEnv = getTableEnvWithPgCatalog();

tEnv.sqlUpdate(String.format("insert into %s select * from `%s`", TABLE4, TABLE1));
tEnv.execute("test");

List<Row> results = TableUtils.collectToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE1)));
assertEquals("[1]", results.toString());
}

private TableEnvironment getTableEnvWithPgCatalog() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);

tableEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tableEnv.useCatalog(TEST_CATALOG_NAME);
return tableEnv;
}
}

0 comments on commit 8cf8d8d

Please sign in to comment.