Skip to content

Commit

Permalink
Drop table should not clean the folder for Nessie catalog
Browse files Browse the repository at this point in the history
The same table might still be live in other branches or tags.
Therefore, dropping the table should not clean up the files as it
is not reference-aware. Use the Nessie GC tool to clean up expired
files. This behavior is consistent with the Spark Nessie integration.
  • Loading branch information
ajantha-bhat authored and wendigo committed Jul 4, 2024
1 parent f8fd9c6 commit 75a9a91
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class TrinoNessieCatalog

private final String warehouseLocation;
private final NessieIcebergClient nessieClient;
private final TrinoFileSystemFactory fileSystemFactory;

private final Cache<SchemaTableName, TableMetadata> tableMetadataCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
Expand All @@ -93,7 +92,6 @@ public TrinoNessieCatalog(
boolean useUniqueTableLocation)
{
super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation);
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null");
this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
}
Expand Down Expand Up @@ -237,7 +235,8 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
// The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data.
// Nessie GC tool can be used to clean up the expired data.
invalidateTableCache(schemaTableName);
}

Expand All @@ -249,8 +248,8 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
throw new TableNotFoundException(schemaTableName);
}
nessieClient.dropTable(toIdentifier(schemaTableName), true);
String tableLocation = table.getMetadataLocation().replaceFirst("/metadata/[^/]*$", "");
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation);
// The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data.
// Nessie GC tool can be used to clean up the expired data.
invalidateTableCache(schemaTableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ private ZonedDateTime getSnapshotTime(String tableName, long snapshotId)
.getOnlyColumnAsSet());
}

private String getTableLocation(String tableName)
protected String getTableLocation(String tableName)
{
return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,24 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergQueryRunner;
import io.trino.plugin.iceberg.SchemaInitializer;
import io.trino.plugin.iceberg.containers.NessieContainer;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.tpch.TpchTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.nessie.NessieCatalog;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -38,8 +47,14 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
import static org.apache.iceberg.CatalogProperties.URI;
import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION;
import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.abort;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
Expand All @@ -51,6 +66,7 @@ public class TestIcebergNessieCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private Path tempDir;
private NessieCatalog catalog;

public TestIcebergNessieCatalogConnectorSmokeTest()
{
Expand All @@ -73,6 +89,13 @@ protected QueryRunner createQueryRunner()

tempDir = Files.createTempDirectory("test_trino_nessie_catalog");

catalog = (NessieCatalog) buildIcebergCatalog("tpch", ImmutableMap.<String, String>builder()
.put(CATALOG_IMPL, NessieCatalog.class.getName())
.put(URI, nessieContainer.getRestApiUri())
.put(WAREHOUSE_LOCATION, tempDir.toString())
.buildOrThrow(),
new Configuration(false));

return IcebergQueryRunner.builder()
.setBaseDataDir(Optional.of(tempDir))
.setIcebergProperties(
Expand Down Expand Up @@ -140,8 +163,8 @@ protected void dropTableFromMetastore(String tableName)
@Override
protected String getMetadataLocation(String tableName)
{
// used when registering a table, which is not supported by the Nessie catalog
throw new UnsupportedOperationException("metadata location for register_table is not supported");
BaseTable table = (BaseTable) catalog.loadTable(TableIdentifier.of("tpch", tableName));
return table.operations().current().metadataFileLocation();
}

@Test
Expand Down Expand Up @@ -197,7 +220,7 @@ public void testRegisterTableWithDifferentTableName()
public void testRegisterTableWithMetadataFile()
{
assertThatThrownBy(super::testRegisterTableWithMetadataFile)
.hasMessageContaining("metadata location for register_table is not supported");
.hasMessageContaining("register_table procedure is disabled");
}

@Test
Expand Down Expand Up @@ -243,25 +266,110 @@ public void testRepeatUnregisterTable()
@Test
@Override
public void testDropTableWithMissingMetadataFile()
throws Exception
{
assertThatThrownBy(super::testDropTableWithMissingMetadataFile)
.hasMessageMatching("metadata location for register_table is not supported");
String tableName = "test_drop_table_with_missing_metadata_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1);

Location metadataLocation = Location.of(getMetadataLocation(tableName));
Location tableLocation = Location.of(getTableLocation(tableName));

// Delete current metadata file
fileSystem.deleteFile(metadataLocation);
assertThat(fileSystem.newInputFile(metadataLocation).exists())
.describedAs("Current metadata file should not exist")
.isFalse();

// try to drop table
assertUpdate("DROP TABLE " + tableName);
assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse();
assertThat(fileSystem.listFiles(tableLocation).hasNext())
.describedAs("Table location should exist")
.isTrue();
}

@Test
@Override
public void testDropTableWithMissingSnapshotFile()
throws Exception
{
assertThatThrownBy(super::testDropTableWithMissingSnapshotFile)
.hasMessageMatching("metadata location for register_table is not supported");
String tableName = "test_drop_table_with_missing_snapshot_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1);

String metadataLocation = getMetadataLocation(tableName);
TableMetadata tableMetadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), metadataLocation);
Location tableLocation = Location.of(tableMetadata.location());
Location currentSnapshotFile = Location.of(tableMetadata.currentSnapshot().manifestListLocation());

// Delete current snapshot file
fileSystem.deleteFile(currentSnapshotFile);
assertThat(fileSystem.newInputFile(currentSnapshotFile).exists())
.describedAs("Current snapshot file should not exist")
.isFalse();

// try to drop table
assertUpdate("DROP TABLE " + tableName);
assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse();
assertThat(fileSystem.listFiles(tableLocation).hasNext())
.describedAs("Table location should exist")
.isTrue();
}

@Test
@Override
public void testDropTableWithMissingManifestListFile()
throws Exception
{
String tableName = "test_drop_table_with_missing_manifest_list_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1);

String metadataLocation = getMetadataLocation(tableName);
FileIO fileIo = new ForwardingFileIo(fileSystem);
TableMetadata tableMetadata = TableMetadataParser.read(fileIo, metadataLocation);
Location tableLocation = Location.of(tableMetadata.location());
Location manifestListFile = Location.of(tableMetadata.currentSnapshot().allManifests(fileIo).get(0).path());

// Delete Manifest List file
fileSystem.deleteFile(manifestListFile);
assertThat(fileSystem.newInputFile(manifestListFile).exists())
.describedAs("Manifest list file should not exist")
.isFalse();

// try to drop table
assertUpdate("DROP TABLE " + tableName);
assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse();
assertThat(fileSystem.listFiles(tableLocation).hasNext())
.describedAs("Table location should exist")
.isTrue();
}

@Test
@Override
public void testDropTableWithMissingDataFile()
throws Exception
{
assertThatThrownBy(super::testDropTableWithMissingManifestListFile)
.hasMessageContaining("metadata location for register_table is not supported");
String tableName = "test_drop_table_with_missing_data_file_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'POLAND')", 1);

Location tableLocation = Location.of(getTableLocation(tableName));
Location tableDataPath = tableLocation.appendPath("data");
FileIterator fileIterator = fileSystem.listFiles(tableDataPath);
assertThat(fileIterator.hasNext()).isTrue();
Location dataFile = fileIterator.next().location();

// Delete data file
fileSystem.deleteFile(dataFile);
assertThat(fileSystem.newInputFile(dataFile).exists())
.describedAs("Data file should not exist")
.isFalse();

// try to drop table
assertUpdate("DROP TABLE " + tableName);
assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse();
assertThat(fileSystem.listFiles(tableLocation).hasNext())
.describedAs("Table location should exist")
.isTrue();
}

@Override
Expand Down

0 comments on commit 75a9a91

Please sign in to comment.