From 75a9a9126f6a81ad287ae327453c72a1cfc7551e Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 14 Jun 2024 11:08:24 +0530 Subject: [PATCH] Drop table should not clean the folder for Nessie catalog The same table might still be live in other branches or tags. Therefore, dropping the table should not clean up the files as it is not reference-aware. Use the Nessie GC tool to clean up expired files. This behavior is consistent with the Spark Nessie integration. --- .../catalog/nessie/TrinoNessieCatalog.java | 9 +- .../BaseIcebergConnectorSmokeTest.java | 2 +- ...cebergNessieCatalogConnectorSmokeTest.java | 126 ++++++++++++++++-- 3 files changed, 122 insertions(+), 15 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 446abfc38c20d..ec79604110b01 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -77,7 +77,6 @@ public class TrinoNessieCatalog private final String warehouseLocation; private final NessieIcebergClient nessieClient; - private final TrinoFileSystemFactory fileSystemFactory; private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() .maximumSize(PER_QUERY_CACHE_SIZE) @@ -93,7 +92,6 @@ public TrinoNessieCatalog( boolean useUniqueTableLocation) { super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation); - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null"); this.nessieClient = requireNonNull(nessieClient, "nessieClient is null"); } @@ -237,7 +235,8 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) BaseTable table = (BaseTable) loadTable(session, schemaTableName); validateTableCanBeDropped(table); nessieClient.dropTable(toIdentifier(schemaTableName), true); - deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + // The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data. + // Nessie GC tool can be used to clean up the expired data. invalidateTableCache(schemaTableName); } @@ -249,8 +248,8 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT throw new TableNotFoundException(schemaTableName); } nessieClient.dropTable(toIdentifier(schemaTableName), true); - String tableLocation = table.getMetadataLocation().replaceFirst("/metadata/[^/]*$", ""); - deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + // The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data. + // Nessie GC tool can be used to clean up the expired data. invalidateTableCache(schemaTableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 2a33e78d49e8d..51e667f23f3f6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -859,7 +859,7 @@ private ZonedDateTime getSnapshotTime(String tableName, long snapshotId) .getOnlyColumnAsSet()); } - private String getTableLocation(String tableName) + protected String getTableLocation(String tableName) { return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java index be81d7cb60d0c..de0f6cb6de035 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java @@ -15,15 +15,24 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; import io.trino.plugin.iceberg.containers.NessieContainer; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; import io.trino.tpch.TpchTable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.nessie.NessieCatalog; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -38,8 +47,14 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; +import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; +import static org.apache.iceberg.CatalogProperties.URI; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; +import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog; import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assumptions.abort; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -51,6 +66,7 @@ public class TestIcebergNessieCatalogConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { private Path tempDir; + private NessieCatalog catalog; public TestIcebergNessieCatalogConnectorSmokeTest() { @@ -73,6 +89,13 @@ protected QueryRunner createQueryRunner() tempDir = Files.createTempDirectory("test_trino_nessie_catalog"); + catalog = (NessieCatalog) buildIcebergCatalog("tpch", ImmutableMap.builder() + .put(CATALOG_IMPL, NessieCatalog.class.getName()) + .put(URI, nessieContainer.getRestApiUri()) + .put(WAREHOUSE_LOCATION, tempDir.toString()) + .buildOrThrow(), + new Configuration(false)); + return IcebergQueryRunner.builder() .setBaseDataDir(Optional.of(tempDir)) .setIcebergProperties( @@ -140,8 +163,8 @@ protected void dropTableFromMetastore(String tableName) @Override protected String getMetadataLocation(String tableName) { - // used when registering a table, which is not supported by the Nessie catalog - throw new UnsupportedOperationException("metadata location for register_table is not supported"); + BaseTable table = (BaseTable) catalog.loadTable(TableIdentifier.of("tpch", tableName)); + return table.operations().current().metadataFileLocation(); } @Test @@ -197,7 +220,7 @@ public void testRegisterTableWithDifferentTableName() public void testRegisterTableWithMetadataFile() { assertThatThrownBy(super::testRegisterTableWithMetadataFile) - .hasMessageContaining("metadata location for register_table is not supported"); + .hasMessageContaining("register_table procedure is disabled"); } @Test @@ -243,25 +266,110 @@ public void testRepeatUnregisterTable() @Test @Override public void testDropTableWithMissingMetadataFile() + throws Exception { - assertThatThrownBy(super::testDropTableWithMissingMetadataFile) - .hasMessageMatching("metadata location for register_table is not supported"); + String tableName = "test_drop_table_with_missing_metadata_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + Location metadataLocation = Location.of(getMetadataLocation(tableName)); + Location tableLocation = Location.of(getTableLocation(tableName)); + + // Delete current metadata file + fileSystem.deleteFile(metadataLocation); + assertThat(fileSystem.newInputFile(metadataLocation).exists()) + .describedAs("Current metadata file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); } @Test @Override public void testDropTableWithMissingSnapshotFile() + throws Exception { - assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) - .hasMessageMatching("metadata location for register_table is not supported"); + String tableName = "test_drop_table_with_missing_snapshot_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + TableMetadata tableMetadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location currentSnapshotFile = Location.of(tableMetadata.currentSnapshot().manifestListLocation()); + + // Delete current snapshot file + fileSystem.deleteFile(currentSnapshotFile); + assertThat(fileSystem.newInputFile(currentSnapshotFile).exists()) + .describedAs("Current snapshot file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); } @Test @Override public void testDropTableWithMissingManifestListFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_manifest_list_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + FileIO fileIo = new ForwardingFileIo(fileSystem); + TableMetadata tableMetadata = TableMetadataParser.read(fileIo, metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location manifestListFile = Location.of(tableMetadata.currentSnapshot().allManifests(fileIo).get(0).path()); + + // Delete Manifest List file + fileSystem.deleteFile(manifestListFile); + assertThat(fileSystem.newInputFile(manifestListFile).exists()) + .describedAs("Manifest list file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); + } + + @Test + @Override + public void testDropTableWithMissingDataFile() + throws Exception { - assertThatThrownBy(super::testDropTableWithMissingManifestListFile) - .hasMessageContaining("metadata location for register_table is not supported"); + String tableName = "test_drop_table_with_missing_data_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'POLAND')", 1); + + Location tableLocation = Location.of(getTableLocation(tableName)); + Location tableDataPath = tableLocation.appendPath("data"); + FileIterator fileIterator = fileSystem.listFiles(tableDataPath); + assertThat(fileIterator.hasNext()).isTrue(); + Location dataFile = fileIterator.next().location(); + + // Delete data file + fileSystem.deleteFile(dataFile); + assertThat(fileSystem.newInputFile(dataFile).exists()) + .describedAs("Data file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); } @Override