Skip to content

Commit

Permalink
[FLINK-29126][hive] Fix splitting file optimization doesn't work for …
Browse files Browse the repository at this point in the history
…orc format

This closes apache#20694
  • Loading branch information
luoyuxia authored and godfreyhe committed Oct 11, 2022
1 parent 2a4da25 commit cf70844
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig`
</tbody>
</table>

**注意:** 目前上述参数仅适用于 ORC 格式的 Hive 表。
{{< hint warning >}}
**注意:**
- 为了调整数据分片的大小, Flink 首先将计算得到所有分区下的所有文件的大小。
但是这在分区数量很多的情况下会比较耗时,你可以配置作业参数 `table.exec.hive.calculate-partition-size.thread-num`(默认为3)为一个更大的值使用更多的线程来进行加速。
- 目前上述参数仅适用于 ORC 格式的 Hive 表。
{{< /hint >}}

### 加载分区切片

Expand Down
9 changes: 7 additions & 2 deletions docs/content/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,13 @@ Users can do some performance tuning by tuning the split's size with the follow
</tr>
</tbody>
</table>

**NOTE**: Currently, these two configurations only works for the Hive table stored as ORC format.
{{< hint warning >}}
**NOTE**:
- To tune the split's size, Flink will first get all files' size for all partitions.
If there are too many partitions, it maybe time-consuming,
then you can configure the job configuration `table.exec.hive.calculate-partition-size.thread-num` (3 by default) to a bigger value to enable more threads to speed up the process.
- Currently, these configurations for tuning split size only works for the Hive table stored as ORC format.
{{< /hint >}}

### Load Partition Splits

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public class HiveOptions {
+ " When the value is over estimated, Flink will tend to pack Hive's data into less splits, which will be helpful when Hive's table contains many small files."
+ " And vice versa. It only works for the Hive table stored as ORC format.");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM =
key("table.exec.hive.calculate-partition-size.thread-num")
.intType()
.defaultValue(3)
.withDeprecatedKeys("The thread number to calculate partition's size.");

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED =
key("table.exec.hive.sink.sort-by-dynamic-partition.enable")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,18 @@ private static void validateScanConfigurations(Map<String, String> configuration
}

private void setFlinkConfigurationToJobConf() {
int splitPartitionThreadNum =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
Preconditions.checkArgument(
splitPartitionThreadNum >= 1,
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+ " cannot be less than 1");
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key(),
flinkConf
.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM)
.toString());
String.valueOf(splitPartitionThreadNum));
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX).toString());

jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
String.valueOf(
Expand All @@ -313,6 +316,15 @@ private void setFlinkConfigurationToJobConf() {
HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(),
String.valueOf(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST).getBytes()));
int calPartitionSizeThreadNum =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM);
Preconditions.checkArgument(
calPartitionSizeThreadNum >= 1,
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key()
+ " cannot be less than 1");
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
String.valueOf(calPartitionSizeThreadNum));
}

private boolean isStreamingSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
Expand All @@ -36,6 +37,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.apache.flink.util.concurrent.Executors.newDirectExecutorService;

/**
* A {@link FileEnumerator} implementation for hive source, which generates splits based on {@link
Expand Down Expand Up @@ -81,10 +89,6 @@ public static List<HiveSourceSplit> createInputSplits(
setSplitMaxSize(partitions, jobConf, minNumSplits);
}
int threadNum = getThreadNumToSplitHiveFile(jobConf);
Preconditions.checkArgument(
threadNum >= 1,
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+ " cannot be less than 1");
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
for (HiveTablePartitionSplits partitionSplits :
Expand All @@ -106,8 +110,12 @@ private static boolean supportSetSplitMaxSize(List<HiveTablePartition> partition
// works for orc format
for (HiveTablePartition partition : partitions) {
String serializationLib =
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib();
if (!"orc".equalsIgnoreCase(serializationLib)) {
partition
.getStorageDescriptor()
.getSerdeInfo()
.getSerializationLib()
.toLowerCase();
if (!serializationLib.contains("orc")) {
return false;
}
}
Expand Down Expand Up @@ -140,21 +148,38 @@ private static long calculateMaxSplitBytes(
return Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerSplit));
}

private static long calculateFilesSizeWithOpenCost(
@VisibleForTesting
static long calculateFilesSizeWithOpenCost(
List<HiveTablePartition> partitions, JobConf jobConf, long openCost)
throws IOException {
long totalBytesWithWeight = 0;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
FileSystem fs = inputPath.getFileSystem(jobConf);
// it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue;
int calPartitionSizeThreadNum =
Integer.parseInt(
jobConf.get(
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM
.key()));
ExecutorService executorService = null;
try {
executorService =
calPartitionSizeThreadNum == 1
? newDirectExecutorService()
: Executors.newFixedThreadPool(calPartitionSizeThreadNum);
List<Future<Long>> partitionFilesSizeFutures = new ArrayList<>();
for (HiveTablePartition partition : partitions) {
partitionFilesSizeFutures.add(
executorService.submit(
new PartitionFilesSizeCalculator(partition, openCost, jobConf)));
}
for (FileStatus fileStatus : fs.listStatus(inputPath)) {
long fileByte = fileStatus.getLen();
totalBytesWithWeight += (fileByte + openCost);
for (Future<Long> fileSizeFuture : partitionFilesSizeFutures) {
try {
totalBytesWithWeight += fileSizeFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException("Fail to calculate total files' size.", e);
}
}
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
return totalBytesWithWeight;
Expand Down Expand Up @@ -212,4 +237,33 @@ public FileEnumerator create() {
return new HiveSourceFileEnumerator(partitions, jobConfWrapper.conf());
}
}

/** The calculator to calculate the total bytes with weight for a partition. */
public static class PartitionFilesSizeCalculator implements Callable<Long> {
private final HiveTablePartition hiveTablePartition;
private final Long openCost;
private final JobConf jobConf;

public PartitionFilesSizeCalculator(
HiveTablePartition hiveTablePartition, Long openCost, JobConf jobConf) {
this.hiveTablePartition = hiveTablePartition;
this.openCost = openCost;
this.jobConf = jobConf;
}

@Override
public Long call() throws Exception {
long totalBytesWithWeight = 0L;
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
FileSystem fs = inputPath.getFileSystem(jobConf);
if (fs.exists(inputPath)) {
for (FileStatus fileStatus : fs.listStatus(inputPath)) {
long fileByte = fileStatus.getLen();
totalBytesWithWeight += (fileByte + openCost);
}
}
return totalBytesWithWeight;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -43,17 +46,54 @@ public class HiveSourceFileEnumeratorTest {

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testCalculateFilesSize() throws Exception {
String baseFilePath =
Objects.requireNonNull(this.getClass().getResource("/orc/test.orc")).getPath();
long fileSize = Paths.get(baseFilePath).toFile().length();
File wareHouse = temporaryFolder.newFolder("testCalculateFilesSize");
int partitionNum = 10;
long openCost = 1;
List<HiveTablePartition> hiveTablePartitions = new ArrayList<>();
for (int i = 0; i < partitionNum; i++) {
// create partition directory
Path partitionPath = Paths.get(wareHouse.getPath(), "p_" + i);
Files.createDirectory(partitionPath);
// copy file to the partition directory
Files.copy(Paths.get(baseFilePath), Paths.get(partitionPath.toString(), "t.orc"));
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(partitionPath.toString());
hiveTablePartitions.add(new HiveTablePartition(sd, new Properties()));
}
// test calculation with one single thread
JobConf jobConf = new JobConf();
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "1");
long totalSize =
HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
hiveTablePartitions, jobConf, openCost);
long expectedSize = partitionNum * (fileSize + openCost);
assertThat(totalSize).isEqualTo(expectedSize);

// test calculation with multiple threads
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "3");
totalSize =
HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
hiveTablePartitions, jobConf, openCost);
assertThat(totalSize).isEqualTo(expectedSize);
}

@Test
public void testCreateInputSplits() throws Exception {
int numSplits = 1000;
// create a jobConf with default configuration
JobConf jobConf = new JobConf();
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "1");
File wareHouse = temporaryFolder.newFolder("testCreateInputSplits");
// init the files for the partition
StorageDescriptor sd = new StorageDescriptor();
// set orc format
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setSerializationLib("orc");
serdeInfo.setSerializationLib(OrcSerde.class.getName());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(OrcInputFormat.class.getName());
sd.setLocation(wareHouse.toString());
Expand All @@ -72,6 +112,7 @@ public void testCreateInputSplits() throws Exception {

// set split max size and verify it works
jobConf = new JobConf();
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "1");
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), "10");
// the splits should be more than the number of files
hiveSourceSplits =
Expand All @@ -84,6 +125,7 @@ public void testCreateInputSplits() throws Exception {
assertThat(hiveSourceSplits.size()).isEqualTo(2);

jobConf = new JobConf();
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "1");
// set open cost and verify it works
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(), "1");
hiveSourceSplits =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.JobConf;
import org.apache.http.util.Asserts;
import org.junit.Test;
Expand Down Expand Up @@ -89,7 +90,9 @@ private void commitPartitionWithGivenCreateTime(
List<String> partitionValues, Integer createTime) {
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("/tmp/test");
sd.setSerdeInfo(new SerDeInfo());
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(ParquetHiveSerDe.class.getName());
sd.setSerdeInfo(serDeInfo);
Partition partition =
new Partition(
partitionValues, "testDb", "testTable", createTime, createTime, sd, null);
Expand All @@ -101,6 +104,7 @@ private void commitPartitionWithGivenCreateTime(
private void preparePartitionMonitor() {
List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
JobConf jobConf = new JobConf();
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), "1");
Configuration configuration = new Configuration();

ObjectPath tablePath = new ObjectPath("testDb", "testTable");
Expand Down

0 comments on commit cf70844

Please sign in to comment.