Skip to content

Commit

Permalink
[FLINK-19944][hive] Support sink parallelism configuration for Hive c…
Browse files Browse the repository at this point in the history
…onnector

This closes apache#15060
  • Loading branch information
liushouwei authored and wuchong committed Mar 10, 2021
1 parent c4b042f commit e9fc926
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 14 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/connectors/table/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ public class AnalysisCommitPolicy implements PartitionCommitPolicy {

```

### Sink Parallelism
## Sink Parallelism

The parallelism of writing files into external file system can be configured by the corresponding table option. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will apply the parallelism.
The parallelism of writing files into external file system (including Hive) can be configured by the corresponding table option, which is supported both in streaming mode and in batch mode. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will apply the parallelism.


<table class="table table-bordered">
Expand Down
6 changes: 3 additions & 3 deletions docs/content/docs/connectors/table/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ public class AnalysisCommitPolicy implements PartitionCommitPolicy {

```

### Sink Parallelism
## Sink Parallelism

The parallelism of writing files into external file system can be configured by the corresponding table option. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will apply the parallelism.
The parallelism of writing files into external file system (including Hive) can be configured by the corresponding table option, which is supported both in streaming mode and in batch mode. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will apply the parallelism.


<table class="table table-bordered">
Expand All @@ -393,7 +393,7 @@ The parallelism of writing files into external file system can be configured by
<td>Integer</td>
<td>Parallelism of writing files into external file system. The value should greater than zero otherwise exception will be thrown.</td>
</tr>

</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connectors.hive;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.config.CatalogConfig;
Expand All @@ -27,6 +28,7 @@
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -83,11 +85,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemOptions.SINK_PARALLELISM);
return new HiveTableSink(
context.getConfiguration(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
context.getCatalogTable());
context.getCatalogTable(),
configuredParallelism);
} else {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -113,11 +115,14 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
private boolean overwrite = false;
private boolean dynamicGrouping = false;

@Nullable private final Integer configuredParallelism;

public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table) {
CatalogTable table,
@Nullable Integer configuredParallelism) {
this.flinkConf = flinkConf;
this.jobConf = jobConf;
this.identifier = identifier;
Expand All @@ -128,6 +133,7 @@ public HiveTableSink(
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
this.configuredParallelism = configuredParallelism;
}

@Override
Expand Down Expand Up @@ -172,17 +178,20 @@ private DataStreamSink<?> consume(
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension);

final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
if (isBounded) {
OutputFileConfig fileNaming = fileNamingBuilder.build();
return createBatchSink(dataStream, converter, sd, writerFactory, fileNaming);
return createBatchSink(
dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}

Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder);
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
Expand All @@ -200,7 +209,8 @@ private DataStreamSink<Row> createBatchSink(
DataStructureConverter converter,
StorageDescriptor sd,
HiveWriterFactory recordWriterFactory,
OutputFileConfig fileNaming)
OutputFileConfig fileNaming,
final int parallelism)
throws IOException {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(
Expand All @@ -223,15 +233,16 @@ private DataStreamSink<Row> createBatchSink(
return dataStream
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
.setParallelism(parallelism);
}

private DataStreamSink<?> createStreamSink(
DataStream<RowData> dataStream,
StorageDescriptor sd,
Properties tableProps,
HiveWriterFactory recordWriterFactory,
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder) {
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,
final int parallelism) {
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
Expand Down Expand Up @@ -288,7 +299,6 @@ private DataStreamSink<?> createStreamSink(

long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();

int parallelism = dataStream.getParallelism();
DataStream<PartitionCommitInfo> writerStream;
if (autoCompaction) {
long compactionSize =
Expand Down Expand Up @@ -438,7 +448,9 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {

@Override
public DynamicTableSink copy() {
HiveTableSink sink = new HiveTableSink(flinkConf, jobConf, identifier, catalogTable);
HiveTableSink sink =
new HiveTableSink(
flinkConf, jobConf, identifier, catalogTable, configuredParallelism);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -51,6 +52,10 @@
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_DELAY;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
import static org.junit.Assert.assertEquals;

/** Tests {@link HiveTableSink}. */
public class HiveTableSinkITCase {
Expand All @@ -70,6 +75,50 @@ public static void closeCatalog() {
}
}

@Test
public void testHiveTableSinkWithParallelismInBatch() {
final TableEnvironment tEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
testHiveTableSinkWithParallelismBase(
tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out");
}

@Test
public void testHiveTableSinkWithParallelismInStreaming() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final TableEnvironment tEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
testHiveTableSinkWithParallelismBase(
tEnv, "/explain/testHiveTableSinkWithParallelismInStreaming.out");
}

private void testHiveTableSinkWithParallelismBase(
final TableEnvironment tEnv, final String expectedResourceFileName) {
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");

tEnv.executeSql(
String.format(
"CREATE TABLE test_table ("
+ " id int,"
+ " real_col int"
+ ") TBLPROPERTIES ("
+ " 'sink.parallelism' = '8'" // set sink parallelism = 8
+ ")"));
final String actual =
tEnv.explainSql(
"insert into test_table select 1, 1", ExplainDetail.JSON_EXECUTION_PLAN);
final String expected = readFromResource(expectedResourceFileName);

assertEquals(
replaceStreamNodeId(replaceStageId(expected)),
replaceStreamNodeId(replaceStageId(actual)));

tEnv.executeSql("drop database db1 cascade");
}

@Test
public void testBatchAppend() throws Exception {
TableEnvironment tEnv =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
== Abstract Syntax Tree ==
LogicalSink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- LogicalProject(EXPR$0=[1], EXPR$1=[1])
+- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])
+- Values(tuples=[[{ 0 }]], values=[ZERO])

== Optimized Execution Plan ==
Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])
+- Values(tuples=[[{ 0 }]], values=[ZERO])

== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Values(tuples=[[{ 0 }]], values=[ZERO])",
"pact" : "Data Source",
"contents" : "Source: Values(tuples=[[{ 0 }]], values=[ZERO])",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"pact" : "Operator",
"contents" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 3,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 8,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
== Abstract Syntax Tree ==
LogicalSink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- LogicalProject(EXPR$0=[1], EXPR$1=[1])
+- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])
+- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])

== Optimized Execution Plan ==
Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, EXPR$1])
+- Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])
+- Values(tuples=[[{ 0 }]])

== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Values(tuples=[[{ 0 }]])",
"pact" : "Data Source",
"contents" : "Source: Values(tuples=[[{ 0 }]])",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"pact" : "Operator",
"contents" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 3,
"type" : "StreamingFileWriter",
"pact" : "Operator",
"contents" : "StreamingFileWriter",
"parallelism" : 8,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Sink: end",
"pact" : "Data Sink",
"contents" : "Sink: end",
"parallelism" : 1,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
}

0 comments on commit e9fc926

Please sign in to comment.