Skip to content

Commit

Permalink
[FLINK-13544][connectors] Set parallelism of table sink operator to i…
Browse files Browse the repository at this point in the history
…nput transformation parallelism

This closes apache#9332
  • Loading branch information
wuchong committed Aug 2, 2019
1 parent cb04628 commit 5254126
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

return dataStream
.addSink(sink)
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> data
sinkOptions,
upsertFunction);
return dataStream.addSink(sinkFunction)
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
properties,
serializationSchema,
partitioner);
return dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
return dataStream
.addSink(kafkaProducer)
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static JDBCAppendTableSinkBuilder builder() {
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.addSink(new JDBCSinkFunction(outputFormat))
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private JDBCUpsertOutputFormat newFormat() {
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream
.addSink(new JDBCUpsertSinkFunction(newFormat()))
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
if (numFiles > 0) {
csvRows.setParallelism(numFiles);
sink.setParallelism(numFiles);
} else {
// if file number is not set, use input parallelism to make it chained.
csvRows.setParallelism(dataStream.getParallelism());
sink.setParallelism(dataStream.getParallelism());
}

sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public final void emitDataStream(DataStream<T> dataStream) {

@Override
public final DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {
return dataStream.writeUsingOutputFormat(getOutputFormat());
return dataStream
.writeUsingOutputFormat(getOutputFormat())
.setParallelism(dataStream.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void emitDataSet(DataSet<Row> dataSet) {
public void emitDataStream(DataStream<Row> dataStream) {
dataStream
.map(SpendReportTableSink::format)
.writeUsingOutputFormat(new LoggerOutputFormat());
.writeUsingOutputFormat(new LoggerOutputFormat())
.setParallelism(dataStream.getParallelism());
}

@Override
Expand Down

0 comments on commit 5254126

Please sign in to comment.