Skip to content

Commit

Permalink
Optimized delete table in hive using direct-sql. (#243)
Browse files Browse the repository at this point in the history
* Optimized delete table in hive using direct-sql.
  • Loading branch information
ajoymajumdar committed Mar 12, 2018
1 parent b9d99f3 commit 210671d
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,21 @@ public SequenceGeneration sequenceGeneration(
* @param connectorContext connector config
* @param hiveJdbcTemplate hive JDBC template
* @param serviceMetric fast service metric
* @param directSqlSavePartition partition service involving direct sqls
* @return HiveConnectorPartitionService
*/
@Bean
public DirectSqlTable directSqlTable(
final ConnectorContext connectorContext,
@Qualifier("hiveJdbcTemplate") final JdbcTemplate hiveJdbcTemplate,
final HiveConnectorFastServiceMetric serviceMetric
final HiveConnectorFastServiceMetric serviceMetric,
final DirectSqlSavePartition directSqlSavePartition
) {
return new DirectSqlTable(
connectorContext,
hiveJdbcTemplate,
serviceMetric
serviceMetric,
directSqlSavePartition
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,34 @@ private List<PartitionSequenceIds> getPartitionSequenceIds(final QualifiedName t
rs.getLong("serde_id")));
}

/**
* Delete all the partitions for the given table <code>tableQName</code>.
*
* @param tableQName table name
*/
public void delete(final QualifiedName tableQName) {
final long start = registry.clock().wallTime();
try {
List<PartitionSequenceIds> partitionSequenceIds = getPartitionSequenceIds(tableQName);
while (!partitionSequenceIds.isEmpty()) {
_delete(partitionSequenceIds);
partitionSequenceIds = getPartitionSequenceIds(tableQName);
}
} finally {
this.fastServiceMetric.recordTimer(
HiveMetrics.TagDropHivePartitions.getMetricName(), registry.clock().wallTime() - start);
}
}

private List<PartitionSequenceIds> getPartitionSequenceIds(final QualifiedName tableQName) {
return jdbcTemplate.query(
String.format(SQL.PARTITIONS_SELECT_ALL, this.batchSize),
new Object[]{tableQName.getDatabaseName(), tableQName.getTableName()},
new int[]{Types.VARCHAR, Types.VARCHAR},
(rs, rowNum) -> new PartitionSequenceIds(rs.getLong("part_id"), rs.getLong("sd_id"),
rs.getLong("serde_id")));
}

@SuppressWarnings("checkstyle:methodname")
private void _delete(final List<PartitionSequenceIds> subPartitionIds) {
final List<String> paramVariables = subPartitionIds.stream().map(s -> "?").collect(Collectors.toList());
Expand Down Expand Up @@ -418,6 +446,10 @@ private static class SQL {
"INSERT INTO PARTITION_KEY_VALS(PART_ID,PART_KEY_VAL,INTEGER_IDX) VALUES (?,?,?)";
static final String PARTITION_KEY_VALS_DELETES =
"DELETE FROM PARTITION_KEY_VALS WHERE PART_ID in (%s)";
static final String PARTITIONS_SELECT_ALL =
"SELECT P.PART_ID, P.SD_ID, S.SERDE_ID FROM DBS D JOIN TBLS T ON D.DB_ID=T.DB_ID "
+ "JOIN PARTITIONS P ON T.TBL_ID=P.TBL_ID JOIN SDS S ON P.SD_ID=S.SD_ID "
+ "WHERE D.NAME=? and T.TBL_NAME=? limit %d";
static final String PARTITIONS_SELECT =
"SELECT P.PART_ID, P.SD_ID, S.SERDE_ID FROM DBS D JOIN TBLS T ON D.DB_ID=T.DB_ID "
+ "JOIN PARTITIONS P ON T.TBL_ID=P.TBL_ID JOIN SDS S ON P.SD_ID=S.SD_ID "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.google.common.collect.Maps;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.exception.ConnectorException;
import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException;
import com.netflix.metacat.common.server.connectors.exception.TableNotFoundException;
import com.netflix.metacat.connector.hive.monitoring.HiveMetrics;
import com.netflix.metacat.connector.hive.util.HiveConnectorFastServiceMetric;
import com.netflix.spectator.api.Registry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;
Expand Down Expand Up @@ -60,23 +62,27 @@ public class DirectSqlTable {
private final JdbcTemplate jdbcTemplate;
private final HiveConnectorFastServiceMetric fastServiceMetric;
private final String catalogName;
private final DirectSqlSavePartition directSqlSavePartition;

/**
* Constructor.
*
* @param connectorContext server context
* @param jdbcTemplate JDBC template
* @param fastServiceMetric fast service metric
* @param connectorContext server context
* @param jdbcTemplate JDBC template
* @param fastServiceMetric fast service metric
* @param directSqlSavePartition direct sql partition service
*/
public DirectSqlTable(
final ConnectorContext connectorContext,
final JdbcTemplate jdbcTemplate,
final HiveConnectorFastServiceMetric fastServiceMetric
final HiveConnectorFastServiceMetric fastServiceMetric,
final DirectSqlSavePartition directSqlSavePartition
) {
this.catalogName = connectorContext.getCatalogName();
this.registry = connectorContext.getRegistry();
this.jdbcTemplate = jdbcTemplate;
this.fastServiceMetric = fastServiceMetric;
this.directSqlSavePartition = directSqlSavePartition;
}

/**
Expand Down Expand Up @@ -222,6 +228,54 @@ public Long getTableId(final QualifiedName tableName) {
}
}

/**
* Deletes all the table related information from the store.
* @param tableName table name
*/
public void delete(final QualifiedName tableName) {
try {
final TableSequenceIds ids = getSequenceIds(tableName);
directSqlSavePartition.delete(tableName);
jdbcTemplate.update(SQL.UPDATE_SDS_CD, new SqlParameterValue(Types.BIGINT, null),
new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.UPDATE_SDS_SERDE, new SqlParameterValue(Types.BIGINT, null),
new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_COLUMNS_V2, new SqlParameterValue(Types.BIGINT, ids.getCdId()));
jdbcTemplate.update(SQL.DELETE_CDS, new SqlParameterValue(Types.BIGINT, ids.getCdId()));
jdbcTemplate.update(SQL.DELETE_PARTITION_KEYS, new SqlParameterValue(Types.BIGINT, ids.getTableId()));
jdbcTemplate.update(SQL.DELETE_TABLE_PARAMS, new SqlParameterValue(Types.BIGINT, ids.getTableId()));
jdbcTemplate.update(SQL.DELETE_TAB_COL_STATS, new SqlParameterValue(Types.BIGINT, ids.getTableId()));
jdbcTemplate.update(SQL.UPDATE_TABLE_SD, new SqlParameterValue(Types.BIGINT, null),
new SqlParameterValue(Types.BIGINT, ids.getTableId()));
jdbcTemplate.update(SQL.DELETE_SKEWED_COL_NAMES, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_BUCKETING_COLS, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_SORT_COLS, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_SD_PARAMS, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_SKEWED_COL_VALUE_LOC_MAP,
new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_SKEWED_VALUES, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_SERDE_PARAMS, new SqlParameterValue(Types.BIGINT, ids.getSerdeId()));
jdbcTemplate.update(SQL.DELETE_SERDES, new SqlParameterValue(Types.BIGINT, ids.getSerdeId()));
jdbcTemplate.update(SQL.DELETE_SDS, new SqlParameterValue(Types.BIGINT, ids.getSdsId()));
jdbcTemplate.update(SQL.DELETE_TBLS, new SqlParameterValue(Types.BIGINT, ids.getTableId()));
} catch (DataAccessException e) {
throw new ConnectorException(String.format("Failed delete hive table %s", tableName), e);
}
}

private TableSequenceIds getSequenceIds(final QualifiedName tableName) {
try {
return jdbcTemplate.queryForObject(
SQL.TABLE_SEQUENCE_IDS,
new Object[]{tableName.getDatabaseName(), tableName.getTableName()},
new int[]{Types.VARCHAR, Types.VARCHAR},
(rs, rowNum) -> new TableSequenceIds(rs.getLong("tbl_id"), rs.getLong("cd_id"),
rs.getLong("sd_id"), rs.getLong("serde_id")));
} catch (EmptyResultDataAccessException e) {
throw new TableNotFoundException(tableName);
}
}

@VisibleForTesting
private static class SQL {
static final String GET_TABLE_NAMES_BY_URI =
Expand All @@ -237,5 +291,26 @@ private static class SQL {
"update TABLE_PARAMS set param_value=? WHERE tbl_id=? and param_key=?";
static final String INSERT_TABLE_PARAMS =
"insert into TABLE_PARAMS(tbl_id,param_key,param_value) values (?,?,?)";
static final String UPDATE_SDS_CD = "UPDATE SDS SET CD_ID=? WHERE SD_ID=?";
static final String DELETE_COLUMNS_V2 = "DELETE FROM COLUMNS_V2 WHERE CD_ID=?";
static final String DELETE_CDS = "DELETE FROM CDS WHERE CD_ID=?";
static final String DELETE_PARTITION_KEYS = "DELETE FROM PARTITION_KEYS WHERE TBL_ID=?";
static final String DELETE_TABLE_PARAMS = "DELETE FROM TABLE_PARAMS WHERE TBL_ID=?";
static final String DELETE_TAB_COL_STATS = "DELETE FROM TAB_COL_STATS WHERE TBL_ID=?";
static final String UPDATE_TABLE_SD = "UPDATE TBLS SET SD_ID=? WHERE TBL_ID=?";
static final String DELETE_SKEWED_COL_NAMES = "DELETE FROM SKEWED_COL_NAMES WHERE SD_ID=?";
static final String DELETE_BUCKETING_COLS = "DELETE FROM BUCKETING_COLS WHERE SD_ID=?";
static final String DELETE_SORT_COLS = "DELETE FROM SORT_COLS WHERE SD_ID=?";
static final String DELETE_SD_PARAMS = "DELETE FROM SD_PARAMS WHERE SD_ID=?";
static final String DELETE_SKEWED_COL_VALUE_LOC_MAP = "DELETE FROM SKEWED_COL_VALUE_LOC_MAP WHERE SD_ID=?";
static final String DELETE_SKEWED_VALUES = "DELETE FROM SKEWED_VALUES WHERE SD_ID_OID=?";
static final String UPDATE_SDS_SERDE = "UPDATE SDS SET SERDE_ID=? WHERE SD_ID=?";
static final String DELETE_SERDE_PARAMS = "DELETE FROM SERDE_PARAMS WHERE SERDE_ID=?";
static final String DELETE_SERDES = "DELETE FROM SERDES WHERE SERDE_ID=?";
static final String DELETE_SDS = "DELETE FROM SDS WHERE SD_ID=?";
static final String DELETE_TBLS = "DELETE FROM TBLS WHERE TBL_ID=?";
static final String TABLE_SEQUENCE_IDS = "select t.tbl_id, s.sd_id, s.cd_id, s.serde_id"
+ " from DBS d join TBLS t on d.db_id=t.db_id join SDS s on t.sd_id=s.sd_id"
+ " where d.name=? and t.tbl_name=?";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,12 @@ private boolean isIcebergTable(final TableInfo tableInfo) {
&& DirectSqlTable.ICEBERG_TABLE_TYPE
.equalsIgnoreCase(tableInfo.getMetadata().get(DirectSqlTable.PARAM_TABLE_TYPE));
}

/**
* {@inheritDoc}.
*/
@Override
public void delete(final ConnectorRequestContext requestContext, final QualifiedName name) {
directSqlTable.delete(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,46 @@
*/
package com.netflix.metacat.connector.hive.sql;

import lombok.Data;
import lombok.Getter;

import javax.annotation.Nullable;

/**
* Class representing the ids for a table.
*
* @author amajumdar
*/
@Data
@Getter
public class TableSequenceIds {
private final Long tableId;
private final Long cdId;
private final Long sdsId;
private final Long serdeId;

/**
* Constructor.
* @param tableId table id
* @param cdId column id
*/
public TableSequenceIds(final Long tableId,
final Long cdId) {
this(tableId, cdId, null, null);
}

/**
* Constructor.
* @param tableId table id
* @param cdId column id
* @param sdsId sds id
* @param serdeId serde id
*/
public TableSequenceIds(final Long tableId,
final Long cdId,
@Nullable final Long sdsId,
@Nullable final Long serdeId) {
this.tableId = tableId;
this.cdId = cdId;
this.sdsId = sdsId;
this.serdeId = serdeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package com.netflix.metacat.connector.hive.sql
import com.google.common.collect.Maps
import com.netflix.metacat.common.QualifiedName
import com.netflix.metacat.common.server.connectors.ConnectorContext
import com.netflix.metacat.common.server.connectors.exception.ConnectorException
import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException
import com.netflix.metacat.common.server.connectors.exception.TableNotFoundException
import com.netflix.metacat.common.server.properties.DefaultConfigImpl
import com.netflix.metacat.common.server.properties.MetacatProperties
import com.netflix.metacat.connector.hive.util.HiveConnectorFastServiceMetric
import com.netflix.spectator.api.NoopRegistry
import org.springframework.dao.CannotAcquireLockException
import org.springframework.dao.DataAccessException
import org.springframework.dao.EmptyResultDataAccessException
import org.springframework.jdbc.core.JdbcTemplate
import spock.lang.Specification
Expand All @@ -25,7 +29,8 @@ class DirectSqlTableSpec extends Specification {
def context = new ConnectorContext('test', 'test', 'hive', config, registry, Maps.newHashMap())
def metric = new HiveConnectorFastServiceMetric(registry)
def jdbcTemplate = Mock(JdbcTemplate)
def service = new DirectSqlTable(context, jdbcTemplate, metric)
def directSqlSavePartition = Mock(DirectSqlSavePartition)
def service = new DirectSqlTable(context, jdbcTemplate, metric, directSqlSavePartition)
def catalogName = 'c'
def databaseName = 'd'
def tableName = 't'
Expand Down Expand Up @@ -120,4 +125,23 @@ class DirectSqlTableSpec extends Specification {
1 * jdbcTemplate.update(DirectSqlTable.SQL.UPDATE_TABLE_PARAMS,_) >> {throw new Exception()}
thrown(Exception)
}

def "Test delete table"() {
when:
service.delete(qualifiedName)
then:
1 * jdbcTemplate.queryForObject(DirectSqlTable.SQL.TABLE_SEQUENCE_IDS,_,_,_) >> {throw new EmptyResultDataAccessException(1)}
thrown(TableNotFoundException)
when:
service.delete(qualifiedName)
then:
1 * jdbcTemplate.queryForObject(DirectSqlTable.SQL.TABLE_SEQUENCE_IDS,_,_,_) >> new TableSequenceIds(1,1,1,1)
1 * directSqlSavePartition.delete(qualifiedName) >> {throw new CannotAcquireLockException('a')}
thrown(ConnectorException)
when:
service.delete(qualifiedName)
then:
1 * jdbcTemplate.queryForObject(DirectSqlTable.SQL.TABLE_SEQUENCE_IDS,_,_,_) >> new TableSequenceIds(1,1,1,1)
noExceptionThrown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ services:
-Dmetacat.type.converter=com.netflix.metacat.connector.pig.converters.PigTypeConverter
-Dmetacat.definition.metadata.delete.enableForTable=false
-Dmetacat.definition.metadata.delete.enableDeleteForQualifiedNames=hive-metastore/hsmoke_ddb,hive-metastore/hsmoke_ddb1/test_create_table1,cassandra-310,embedded-hive-metastore,embedded-fast-hive-metastore,s3-mysql-db,mysql-56-db
-Dmetacat.hive.metastore.batchSize=10
-Dmetacat.usermetadata.config.location=/etc/metacat/usermetadata.properties'
labels:
- "com.netflix.metacat.oss.test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,21 +316,30 @@ class MetacatSmokeSpec extends Specification {
given:
def name = catalogName + '/' + databaseName + '/' + tableName
createTable(catalogName, databaseName, tableName)
def partitions = PigDataDtoProvider.getPartitions(catalogName, databaseName, tableName, 'field1=xyz/field3=abc', isLocalEnv ? 'file:/tmp/abc' : null, count)
partitionApi.savePartitions(catalogName, databaseName, tableName, new PartitionsSaveRequestDto(partitions: partitions))
api.deleteTable(catalogName, databaseName, tableName)
def definitions = metadataApi.getDefinitionMetadataList(null, null, null, null, null, null, name,null)
expect:
definitions.size() == result
cleanup:
metadataApi.deleteDefinitionMetadata(name, true)
where:
catalogName | databaseName | tableName | result
'embedded-hive-metastore' | 'smoke_ddb1' | 'test_create_table' | 0
'embedded-fast-hive-metastore' | 'fsmoke_ddb1' | 'test_create_table' | 0
'embedded-fast-hive-metastore' | 'shard' | 'test_create_table' | 0
'hive-metastore' | 'hsmoke_ddb' | 'test_create_table' | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table1' | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table2' | 1
's3-mysql-db' | 'smoke_ddb1' | 'test_create_table' | 0
catalogName | databaseName | tableName | count | result
'embedded-hive-metastore' | 'smoke_ddb1' | 'test_create_table' | 15 | 0
'embedded-fast-hive-metastore' | 'fsmoke_ddb1' | 'test_create_table' | 15 | 0
'embedded-fast-hive-metastore' | 'shard' | 'test_create_table' | 15 | 0
'hive-metastore' | 'hsmoke_ddb' | 'test_create_table' | 15 | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table1' | 15 | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table2' | 15 | 1
's3-mysql-db' | 'smoke_ddb1' | 'test_create_table' | 15 | 0
'embedded-hive-metastore' | 'smoke_ddb1' | 'test_create_table' | 10 | 0
'embedded-fast-hive-metastore' | 'fsmoke_ddb1' | 'test_create_table' | 10 | 0
'embedded-fast-hive-metastore' | 'shard' | 'test_create_table' | 10 | 0
'hive-metastore' | 'hsmoke_ddb' | 'test_create_table' | 10 | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table1' | 10 | 0
'hive-metastore' | 'hsmoke_ddb1' | 'test_create_table2' | 10 | 1
's3-mysql-db' | 'smoke_ddb1' | 'test_create_table' | 10 | 0
}

@Unroll
Expand Down Expand Up @@ -957,6 +966,15 @@ class MetacatSmokeSpec extends Specification {
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 10 | 0
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 10 | 10
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 10 | 5
'embedded-hive-metastore' | 'smoke_db5' | 'part' | 'one=xyz' | 15 | 0
'embedded-hive-metastore' | 'smoke_db5' | 'part' | 'one=xyz' | 15 | 15
'embedded-hive-metastore' | 'smoke_db5' | 'part' | 'one=xyz' | 15 | 5
'embedded-fast-hive-metastore' | 'fsmoke_db5' | 'part' | 'one=xyz' | 15 | 0
'embedded-fast-hive-metastore' | 'fsmoke_db5' | 'part' | 'one=xyz' | 15 | 15
'embedded-fast-hive-metastore' | 'fsmoke_db5' | 'part' | 'one=xyz' | 15 | 5
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 15 | 0
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 15 | 15
'hive-metastore' | 'hsmoke_db5' | 'part' | 'one=xyz' | 15 | 5
}
@Unroll
Expand Down

0 comments on commit 210671d

Please sign in to comment.