Skip to content

Commit

Permalink
[hotfix][table-common] Improve source and sink provider docs
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Aug 31, 2021
1 parent 4726878 commit 7d8af8f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
* DynamicTableSink}.
*
* <p>Note: This provider is only meant for advanced connector developers. Usually, a sink should
* consist of a single entity expressed via {@link OutputFormatProvider} or {@link
* SinkFunctionProvider}, or {@link SinkProvider}.
* consist of a single entity expressed via {@link SinkProvider}, {@link SinkFunctionProvider}, or
* {@link OutputFormatProvider}. When using a {@link DataStream} an implementer needs to pay
* attention to how changes are shuffled to not mess up the changelog per parallel subtask.
*/
@PublicEvolving
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

import java.util.Optional;

/**
Expand All @@ -39,7 +41,8 @@ static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
}

/** Helper method for creating a SinkFunction provider with a provided sink parallelism. */
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Integer sinkParallelism) {
static SinkFunctionProvider of(
SinkFunction<RowData> sinkFunction, @Nullable Integer sinkParallelism) {
return new SinkFunctionProvider() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
* ScanTableSource}.
*
* <p>Note: This provider is only meant for advanced connector developers. Usually, a source should
* consist of a single entity expressed via {@link InputFormatProvider}, {@link
* SourceFunctionProvider}, or {@link SourceProvider}.
* consist of a single entity expressed via {@link SourceProvider}, {@link SourceFunctionProvider},
* or {@link InputFormatProvider}.
*/
@PublicEvolving
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
Expand Down Expand Up @@ -101,10 +100,11 @@ public interface DynamicTableSink {
* <p>The given {@link Context} offers utilities by the planner for creating runtime
* implementation with minimal dependencies to internal data structures.
*
* <p>See {@code org.apache.flink.table.connector.sink.SinkFunctionProvider} in {@code
* flink-table-api-java-bridge}.
* <p>{@link SinkProvider} is the recommended core interface. {@code SinkFunctionProvider} in
* {@code flink-table-api-java-bridge} and {@link OutputFormatProvider} are available for
* backwards compatibility.
*
* @see ParallelismProvider
* @see SinkProvider
*/
SinkRuntimeProvider getSinkRuntimeProvider(Context context);

Expand Down Expand Up @@ -186,8 +186,11 @@ interface DataStructureConverter extends RuntimeConverter {
* SinkRuntimeProvider} serves as the base interface. Concrete {@link SinkRuntimeProvider}
* interfaces might be located in other Flink modules.
*
* <p>See {@code org.apache.flink.table.connector.sink.SinkFunctionProvider} in {@code
* flink-table-api-java-bridge}.
* <p>{@link SinkProvider} is the recommended core interface. {@code SinkFunctionProvider} in
* {@code flink-table-api-java-bridge} and {@link OutputFormatProvider} are available for
* backwards compatibility.
*
* @see SinkProvider
*/
interface SinkRuntimeProvider {
// marker interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

import java.util.Optional;

/** Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}. */
/**
* Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}.
*
* <p>{@code DataStreamSinkProvider} in {@code flink-table-api-java-bridge} is available for
* advanced connector developers.
*/
@PublicEvolving
public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

Expand All @@ -35,7 +42,7 @@ static SinkProvider of(Sink<RowData, ?, ?, ?> sink) {
}

/** Helper method for creating a Sink provider with a provided sink parallelism. */
static SinkProvider of(Sink<RowData, ?, ?, ?> sink, Integer sinkParallelism) {
static SinkProvider of(Sink<RowData, ?, ?, ?> sink, @Nullable Integer sinkParallelism) {
return new SinkProvider() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ public interface ScanTableSource extends DynamicTableSource {
* <p>The given {@link ScanContext} offers utilities by the planner for creating runtime
* implementation with minimal dependencies to internal data structures.
*
* <p>See {@code org.apache.flink.table.connector.source.SourceFunctionProvider} in {@code
* flink-table-api-java-bridge}.
* <p>{@link SourceProvider} is the recommended core interface. {@code SourceFunctionProvider}
* in {@code flink-table-api-java-bridge} and {@link InputFormatProvider} are available for
* backwards compatibility.
*
* @see SourceProvider
*/
ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);

Expand Down Expand Up @@ -115,8 +118,9 @@ interface ScanContext extends DynamicTableSource.Context {
* ScanRuntimeProvider} serves as the base interface. Concrete {@link ScanRuntimeProvider}
* interfaces might be located in other Flink modules.
*
* <p>See {@code org.apache.flink.table.connector.source.SourceFunctionProvider} in {@code
* flink-table-api-java-bridge}.
* <p>{@link SourceProvider} is the recommended core interface. {@code SourceFunctionProvider}
* in {@code flink-table-api-java-bridge} and {@link InputFormatProvider} are available for
* backwards compatibility.
*/
interface ScanRuntimeProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

/**
* Provider of a {@link Source} instance as a runtime implementation for {@link ScanTableSource}.
*
* <p>{@code DataStreamScanProvider} in {@code flink-table-api-java-bridge} is available for
* advanced connector developers.
*/
@PublicEvolving
public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -120,10 +120,10 @@ public interface SupportsReadingMetadata {
* <p>Implementations of this method must be idempotent. The planner might call this method
* multiple times.
*
* <p>Note: Use the passed data type instead of {@link TableSchema#toPhysicalRowDataType()} for
* describing the final output data type when creating {@link TypeInformation}. If the source
* implements {@link SupportsProjectionPushDown}, the projection is already considered in the
* given output data type.
* <p>Note: Use the passed data type instead of {@link ResolvedSchema#toPhysicalRowDataType()}
* for describing the final output data type when creating {@link TypeInformation}. If the
* source implements {@link SupportsProjectionPushDown}, the projection is already considered in
* the given output data type.
*
* @param metadataKeys a subset of the keys returned by {@link #listReadableMetadata()}, ordered
* by the iteration order of returned map
Expand Down

0 comments on commit 7d8af8f

Please sign in to comment.