Skip to content

Commit

Permalink
[FLINK-16161][hive] Statistics zero should be unknown in HiveCatalog
Browse files Browse the repository at this point in the history
This closes apache#11199
  • Loading branch information
JingsongLi committed Feb 24, 2020
1 parent 9969083 commit fe9f448
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
import java.util.stream.Collectors;

import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat;
import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat;
import static org.apache.flink.table.filesystem.PartitionPathUtils.unescapePathName;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -1241,14 +1243,10 @@ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatis
* @return whether need to update stats.
*/
private boolean statsChanged(CatalogTableStatistics newTableStats, Map<String, String> parameters) {
String oldRowCount = parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldTotalSize = parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldNumFiles = parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
String oldRawDataSize = parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST);
return newTableStats.getRowCount() != Long.parseLong(oldRowCount)
|| newTableStats.getTotalSize() != Long.parseLong(oldTotalSize)
|| newTableStats.getFileCount() != Integer.parseInt(oldNumFiles)
|| newTableStats.getRawDataSize() != Long.parseLong(oldRawDataSize);
return newTableStats.getRowCount() != parsePositiveLongStat(parameters, StatsSetupConst.ROW_COUNT)
|| newTableStats.getTotalSize() != parsePositiveLongStat(parameters, StatsSetupConst.TOTAL_SIZE)
|| newTableStats.getFileCount() != parsePositiveIntStat(parameters, StatsSetupConst.NUM_FILES)
|| newTableStats.getRawDataSize() != parsePositiveLongStat(parameters, StatsSetupConst.NUM_FILES);
}

/**
Expand All @@ -1264,11 +1262,11 @@ private void updateStats(CatalogTableStatistics newTableStats, Map<String, Strin
}

private static CatalogTableStatistics createCatalogTableStatistics(Map<String, String> parameters) {
long rowCount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_UNKNOWN_STATS_CONST));
return new CatalogTableStatistics(rowCount, numFiles, totalSize, rawDataSize);
return new CatalogTableStatistics(
parsePositiveLongStat(parameters, StatsSetupConst.ROW_COUNT),
parsePositiveIntStat(parameters, StatsSetupConst.NUM_FILES),
parsePositiveLongStat(parameters, StatsSetupConst.TOTAL_SIZE),
parsePositiveLongStat(parameters, StatsSetupConst.RAW_DATA_SIZE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class HiveStatsUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveStatsUtil.class);

public static final String DEFAULT_UNKNOWN_STATS_CONST = "-1";
private static final int DEFAULT_UNKNOWN_STATS_VALUE = -1;

private HiveStatsUtil() {}

Expand Down Expand Up @@ -292,4 +292,24 @@ private static ColumnStatisticsData getColumnStatisticsData(DataType colType, Ca
throw new CatalogException(String.format("Flink does not support converting ColumnStats '%s' for Hive column " +
"type '%s' yet", colStat, colType));
}

public static int parsePositiveIntStat(Map<String, String> parameters, String key) {
String value = parameters.get(key);
if (value == null) {
return DEFAULT_UNKNOWN_STATS_VALUE;
} else {
int v = Integer.parseInt(value);
return v > 0 ? v : DEFAULT_UNKNOWN_STATS_VALUE;
}
}

public static long parsePositiveLongStat(Map<String, String> parameters, String key) {
String value = parameters.get(key);
if (value == null) {
return DEFAULT_UNKNOWN_STATS_VALUE;
} else {
long v = Long.parseLong(value);
return v > 0 ? v : DEFAULT_UNKNOWN_STATS_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
Expand All @@ -32,16 +33,19 @@
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

/**
Expand Down Expand Up @@ -128,6 +132,36 @@ public void testAlterPartitionColumnStatistics() throws Exception {
checkEquals(catalogColumnStatistics, catalog.getPartitionColumnStatistics(path1, partitionSpec));
}

@Test
public void testHiveStatistics() throws Exception {
catalog.createDatabase(db1, createDb(), false);
checkStatistics(0, -1);
checkStatistics(1, 1);
checkStatistics(1000, 1000);
}

private void checkStatistics(int inputStat, int expectStat) throws Exception {
catalog.dropTable(path1, true);

Map<String, String> properties = new HashMap<>();
properties.put(CatalogConfig.IS_GENERIC, "false");
properties.put(StatsSetupConst.ROW_COUNT, String.valueOf(inputStat));
properties.put(StatsSetupConst.NUM_FILES, String.valueOf(inputStat));
properties.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(inputStat));
properties.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(inputStat));
CatalogTable catalogTable = new CatalogTableImpl(
TableSchema.builder().field("f0", DataTypes.INT()).build(),
properties,
"");
catalog.createTable(path1, catalogTable, false);

CatalogTableStatistics statistics = catalog.getTableStatistics(path1);
assertEquals(expectStat, statistics.getRowCount());
assertEquals(expectStat, statistics.getFileCount());
assertEquals(expectStat, statistics.getRawDataSize());
assertEquals(expectStat, statistics.getTotalSize());
}

// ------ utils ------

@Override
Expand Down

0 comments on commit fe9f448

Please sign in to comment.