Skip to content

Commit

Permalink
[FLINK-14662][table api][hive] Distinguish unknown CatalogTableStatis…
Browse files Browse the repository at this point in the history
…tics and zero

This closes apache#10380
  • Loading branch information
zjuwangg authored and KurtYoung committed Dec 3, 2019
1 parent 9576a3f commit ed96fea
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,8 @@ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics ta
throw new CatalogException("Alter table stats is not supported in Hive version " + hiveVersion);
}
// Set table stats
if (compareAndUpdateStatisticsProperties(tableStatistics, hiveTable.getParameters())) {
if (statsChanged(tableStatistics, hiveTable.getParameters())) {
updateStats(tableStatistics, hiveTable.getParameters());
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
}
} catch (TableNotExistException e) {
Expand Down Expand Up @@ -1147,42 +1148,49 @@ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatis
}

/**
* Determine if statistics is need to be updated, if it needs to be updated and updated its parameters.
* @param statistics original ``hive table statistics.
* @param parameters new catalog table statistics parameters.
* @return needUpdateStatistics flag which indicates whether need to update stats.
* Determine if statistics need to be updated or not.
* @param newTableStats new catalog table statistics.
* @param parameters original hive table statistics parameters.
* @return whether need to update stats.
*/
private static boolean compareAndUpdateStatisticsProperties(CatalogTableStatistics statistics, Map<String, String> parameters) {
boolean needUpdateStatistics;
String oldRowCount = parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
String oldTotalSize = parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
String oldNumFiles = parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
String oldRawDataSize = parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST);
needUpdateStatistics = statistics.getRowCount() != Long.parseLong(oldRowCount) || statistics.getTotalSize() != Long.parseLong(oldTotalSize)
|| statistics.getFileCount() != Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != Long.parseLong(oldRawDataSize);
if (needUpdateStatistics) {
parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalSize()));
parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(statistics.getRawDataSize()));
}
return needUpdateStatistics;
private boolean statsChanged(CatalogTableStatistics newTableStats, Map<String, String> parameters) {
String oldRowCount = parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldTotalSize = parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldNumFiles = parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldRawDataSize = parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
return newTableStats.getRowCount() != Long.parseLong(oldRowCount)
|| newTableStats.getTotalSize() != Long.parseLong(oldTotalSize)
|| newTableStats.getFileCount() != Integer.parseInt(oldNumFiles)
|| newTableStats.getRawDataSize() != Long.parseLong(oldRawDataSize);
}

/**
* Update original table statistics parameters.
* @param newTableStats new catalog table statistics.
* @param parameters original hive table statistics parameters.
*/
private void updateStats(CatalogTableStatistics newTableStats, Map<String, String> parameters) {
parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(newTableStats.getRowCount()));
parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(newTableStats.getTotalSize()));
parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(newTableStats.getFileCount()));
parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(newTableStats.getRawDataSize()));
}

private static CatalogTableStatistics createCatalogTableStatistics(Map<String, String> parameters) {
long rowRount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST));
return new CatalogTableStatistics(rowRount, numFiles, totalSize, rawDataSize);
long rowCount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
return new CatalogTableStatistics(rowCount, numFiles, totalSize, rawDataSize);
}

@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
try {
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
// Set table stats
if (compareAndUpdateStatisticsProperties(partitionStatistics, hivePartition.getParameters())) {
if (statsChanged(partitionStatistics, hivePartition.getParameters())) {
updateStats(partitionStatistics, hivePartition.getParameters());
client.alter_partition(tablePath.getDatabaseName(), tablePath.getObjectName(), hivePartition);
}
} catch (TableNotExistException | PartitionSpecInvalidException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class HiveStatsUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveStatsUtil.class);

public static final String DEFAULT_STATS_ZERO_CONST = "0";
public static final String DEFAULT_UNKNOWN_STATS_CONST = "-1";

private HiveStatsUtil() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Statistics for a non-partitioned table or a partition of a partitioned table.
*/
public class CatalogTableStatistics {
public static final CatalogTableStatistics UNKNOWN = new CatalogTableStatistics(0, 0, 0, 0);
public static final CatalogTableStatistics UNKNOWN = new CatalogTableStatistics(-1, -1, -1, -1);

/**
* The number of rows in the table or partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,10 +1075,10 @@ public void testGetPartitionStats() throws Exception{
catalog.createTable(path1, createPartitionedTable(), false);
catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
CatalogTableStatistics tableStatistics = catalog.getPartitionStatistics(path1, createPartitionSpec());
assertEquals(0, tableStatistics.getFileCount());
assertEquals(0, tableStatistics.getRawDataSize());
assertEquals(0, tableStatistics.getTotalSize());
assertEquals(0, tableStatistics.getRowCount());
assertEquals(-1, tableStatistics.getFileCount());
assertEquals(-1, tableStatistics.getRawDataSize());
assertEquals(-1, tableStatistics.getTotalSize());
assertEquals(-1, tableStatistics.getRowCount());
}

@Test
Expand Down

0 comments on commit ed96fea

Please sign in to comment.