Skip to content

Commit

Permalink
[Refactor] add connector metadata request to simplify interfaces (#53122
Browse files Browse the repository at this point in the history
)

Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt authored Nov 22, 2024
1 parent 189990c commit 9627535
Show file tree
Hide file tree
Showing 27 changed files with 272 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public List<String> listTableNames(String dbName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange version) {
return normal.listPartitionNames(databaseName, tableName, version);
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
return normal.listPartitionNames(databaseName, tableName, requestContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector;

public class ConnectorMetadatRequestContext {
public static ConnectorMetadatRequestContext DEFAULT = new ConnectorMetadatRequestContext();
TableVersionRange tableVersionRange = TableVersionRange.empty();

public void setTableVersionRange(TableVersionRange value) {
tableVersionRange = value;
}

public TableVersionRange getTableVersionRange() {
return tableVersionRange;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ default List<String> listTableNames(String dbName) {
/**
* Return all partition names of the table.
*
* @param databaseName the name of the database
* @param tableName the name of the table
* @param tableVersionRange table version range in the query
* @param databaseName the name of the database
* @param tableName the name of the table
* @param requestContext request context
* @return a list of partition names
*/
default List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange tableVersionRange) {
default List<String> listPartitionNames(String databaseName, String tableName,
ConnectorMetadatRequestContext requestContext) {
return Lists.newArrayList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -115,7 +116,7 @@ public Database getDb(String dbName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange version) {
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
return deltaOps.getPartitionKeys(databaseName, tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.common.Version;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -222,7 +223,7 @@ public boolean tableExists(String dbName, String tblName) {
}

@Override
public List<String> listPartitionNames(String dbName, String tblName, TableVersionRange version) {
public List<String> listPartitionNames(String dbName, String tblName, ConnectorMetadatRequestContext requestContext) {
return hmsOps.getPartitionKeys(dbName, tblName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -100,7 +101,7 @@ public List<String> listTableNames(String dbName) {
}

@Override
public List<String> listPartitionNames(String dbName, String tblName, TableVersionRange version) {
public List<String> listPartitionNames(String dbName, String tblName, ConnectorMetadatRequestContext requestContext) {
return hmsOps.getPartitionKeys(dbName, tblName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.ConnectorTableVersion;
Expand Down Expand Up @@ -479,14 +480,15 @@ public boolean tableExists(String dbName, String tblName) {
}

@Override
public List<String> listPartitionNames(String dbName, String tblName, TableVersionRange version) {
public List<String> listPartitionNames(String dbName, String tblName, ConnectorMetadatRequestContext requestContext) {
IcebergCatalogType nativeType = icebergCatalog.getIcebergCatalogType();

if (nativeType != HIVE_CATALOG && nativeType != REST_CATALOG && nativeType != GLUE_CATALOG) {
throw new StarRocksConnectorException(
"Do not support get partitions from catalog type: " + nativeType);
}

TableVersionRange version = requestContext.getTableVersionRange();
long snapshotId = version.end().isPresent() ? version.end().get() : -1;
return icebergCatalog.listPartitionNames(dbName, tblName, snapshotId, jobPlanningExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorTableId;
import com.starrocks.connector.PartitionInfo;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand Down Expand Up @@ -64,7 +64,6 @@ public JDBCMetadata(Map<String, String> properties, String catalogName) {
this(properties, catalogName, null);
}


public JDBCMetadata(Map<String, String> properties, String catalogName, HikariDataSource dataSource) {
this.properties = properties;
this.catalogName = catalogName;
Expand Down Expand Up @@ -220,7 +219,7 @@ public Table getTable(String dbName, String tblName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange version) {
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
return partitionNamesCache.get(new JDBCTableName(null, databaseName, tableName),
k -> {
try (Connection connection = getConnection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.starrocks.catalog.OdpsTable;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorTableId;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -221,7 +222,7 @@ private OdpsTable loadTable(OdpsTableName odpsTableName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange version) {
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
OdpsTableName odpsTableName = OdpsTableName.of(databaseName, tableName);
// TODO: perhaps not good to support users to fetch whole tables?
List<Partition> partitions = get(partitionCache, odpsTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -179,7 +180,7 @@ private void updatePartitionInfo(String databaseName, String tableName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange version) {
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
updatePartitionInfo(databaseName, tableName);
return new ArrayList<>(this.partitionInfos.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.NullablePartitionKey;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.common.AnalysisException;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.PartitionInfo;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.iceberg.IcebergPartitionUtils;
Expand Down Expand Up @@ -80,8 +81,10 @@ public List<String> getPartitionNames() {
IcebergTable icebergTable = (IcebergTable) table;
Optional<Long> snapshotId = Optional.ofNullable(icebergTable.getNativeTable().currentSnapshot())
.map(Snapshot::snapshotId);
ConnectorMetadatRequestContext requestContext = new ConnectorMetadatRequestContext();
requestContext.setTableVersionRange(TableVersionRange.withEnd(snapshotId));
return GlobalStateMgr.getCurrentState().getMetadataMgr().listPartitionNames(
table.getCatalogName(), getDbName(), getTableName(), TableVersionRange.withEnd(snapshotId));
table.getCatalogName(), getDbName(), getTableName(), requestContext);
}

@Override
Expand Down Expand Up @@ -124,9 +127,9 @@ public PartitionKey createPartitionKey(List<String> partitionValues, List<Column
if (field.transform().dedupName().equalsIgnoreCase("time")) {
rawValue = IcebergPartitionUtils.normalizeTimePartitionName(rawValue, field,
icebergTable.getNativeTable().schema(), column.getType());
exprValue = LiteralExpr.create(rawValue, column.getType());
exprValue = LiteralExpr.create(rawValue, column.getType());
} else {
exprValue = LiteralExpr.create(rawValue, column.getType());
exprValue = LiteralExpr.create(rawValue, column.getType());
}
}
partitionKey.pushColumn(exprValue, column.getType().getPrimitiveType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorTableVersion;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -150,9 +151,9 @@ public List<String> listTableNames(String dbName) {
}

@Override
public List<String> listPartitionNames(String databaseName, String tableName, TableVersionRange versionRange) {
public List<String> listPartitionNames(String databaseName, String tableName, ConnectorMetadatRequestContext requestContext) {
ConnectorMetadata metadata = metadataOfTable(databaseName, tableName);
return metadata.listPartitionNames(databaseName, tableName, versionRange);
return metadata.listPartitionNames(databaseName, tableName, requestContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.UserException;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoDefaultSource;
Expand Down Expand Up @@ -247,9 +248,11 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
}

if (detailLevel == TExplainLevel.VERBOSE && !isResourceMappingCatalog(icebergTable.getCatalogName())) {
ConnectorMetadatRequestContext requestContext = new ConnectorMetadatRequestContext();
requestContext.setTableVersionRange(TableVersionRange.withEnd(snapshotId));
List<String> partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr().listPartitionNames(
icebergTable.getCatalogName(), icebergTable.getRemoteDbName(),
icebergTable.getRemoteTableName(), TableVersionRange.withEnd(snapshotId));
icebergTable.getRemoteTableName(), requestContext);

if (selectedPartitionCount == -1) {
if (scanRangeSource != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.starrocks.common.UserException;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorMgr;
import com.starrocks.connector.ConnectorTableVersion;
Expand Down Expand Up @@ -617,15 +618,16 @@ public Pair<Table, MaterializedIndexMeta> getMaterializedViewIndex(String catalo
}

public List<String> listPartitionNames(String catalogName, String dbName, String tableName) {
return listPartitionNames(catalogName, dbName, tableName, TableVersionRange.empty());
return listPartitionNames(catalogName, dbName, tableName, ConnectorMetadatRequestContext.DEFAULT);
}

public List<String> listPartitionNames(String catalogName, String dbName, String tableName, TableVersionRange versionRange) {
public List<String> listPartitionNames(String catalogName, String dbName, String tableName,
ConnectorMetadatRequestContext requestContext) {
Optional<ConnectorMetadata> connectorMetadata = getOptionalMetadata(catalogName);
ImmutableSet.Builder<String> partitionNames = ImmutableSet.builder();
if (connectorMetadata.isPresent()) {
try {
connectorMetadata.get().listPartitionNames(dbName, tableName, versionRange).forEach(partitionNames::add);
connectorMetadata.get().listPartitionNames(dbName, tableName, requestContext).forEach(partitionNames::add);
} catch (Exception e) {
LOG.error("Failed to listPartitionNames on [{}.{}]", catalogName, dbName, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void testMetadataRouting(@Mocked ConnectorMetadata connectorMetadata) throws Use
times = 1;

connectorMetadata.clear();
connectorMetadata.listPartitionNames("test_db", "test_tbl", TableVersionRange.empty());
connectorMetadata.listPartitionNames("test_db", "test_tbl", ConnectorMetadatRequestContext.DEFAULT);
connectorMetadata.dropTable(null);
connectorMetadata.refreshTable("test_db", null, null, false);
connectorMetadata.alterMaterializedView(null);
Expand Down Expand Up @@ -211,7 +211,7 @@ void testMetadataRouting(@Mocked ConnectorMetadata connectorMetadata) throws Use
);

catalogConnectorMetadata.clear();
catalogConnectorMetadata.listPartitionNames("test_db", "test_tbl", TableVersionRange.empty());
catalogConnectorMetadata.listPartitionNames("test_db", "test_tbl", ConnectorMetadatRequestContext.DEFAULT);
catalogConnectorMetadata.dropTable(null);
catalogConnectorMetadata.refreshTable("test_db", null, null, false);
catalogConnectorMetadata.alterMaterializedView(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import com.google.common.collect.Maps;
import com.starrocks.catalog.DeltaLakeTable;
import com.starrocks.catalog.Table;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.ConnectorType;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.MetastoreType;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.hive.HiveMetaClient;
import com.starrocks.connector.hive.HiveMetastore;
import com.starrocks.connector.hive.HiveMetastoreTest;
Expand Down Expand Up @@ -170,8 +170,8 @@ public void close() {
minTimes = 0;
}
};
List<String> partitionNames = deltaLakeMetadata.listPartitionNames("db1", "table1",
TableVersionRange.empty());
List<String> partitionNames =
deltaLakeMetadata.listPartitionNames("db1", "table1", ConnectorMetadatRequestContext.DEFAULT);
Assert.assertEquals(3, partitionNames.size());
Assert.assertEquals("ts=1999", partitionNames.get(0));
Assert.assertEquals("ts=2000", partitionNames.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.common.FeConstants;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.connector.CachingRemoteFileIO;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorProperties;
import com.starrocks.connector.ConnectorType;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -161,7 +162,8 @@ public void testListTableNames() {
@Test
public void testGetPartitionKeys() {
Assert.assertEquals(
Lists.newArrayList("col1"), hiveMetadata.listPartitionNames("db1", "tbl1", TableVersionRange.empty()));
Lists.newArrayList("col1"),
hiveMetadata.listPartitionNames("db1", "tbl1", ConnectorMetadatRequestContext.DEFAULT));
}

@Test
Expand Down Expand Up @@ -284,11 +286,11 @@ public void testGetTableStatisticsWithUnknown() throws AnalysisException {
public void testShowCreateHiveTbl() {
HiveTable hiveTable = (HiveTable) hiveMetadata.getTable("db1", "table1");
Assert.assertEquals("CREATE TABLE `table1` (\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" `col1` int(11) DEFAULT NULL\n" +
")\n" +
"PARTITION BY (col1)\n" +
"PROPERTIES (\"location\" = \"hdfs:https://127.0.0.1:10000/hive\");",
" `col2` int(11) DEFAULT NULL,\n" +
" `col1` int(11) DEFAULT NULL\n" +
")\n" +
"PARTITION BY (col1)\n" +
"PROPERTIES (\"location\" = \"hdfs:https://127.0.0.1:10000/hive\");",
AstToStringBuilder.getExternalCatalogTableDdlStmt(hiveTable));
}

Expand Down
Loading

0 comments on commit 9627535

Please sign in to comment.