Skip to content

Commit

Permalink
Remove schema.#.from in jdbc table factory and update property api
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Jul 10, 2019
1 parent feb0262 commit c1af055
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,25 @@
public class JDBCScanOptions implements Serializable {

private final String partitionColumnName;
private final long partitionSize;
private final long partitionLowerBound;
private final long partitionUpperBound;
private final long numPartitions;

private JDBCScanOptions(
String partitionColumnName,
long partitionSize,
long partitionLowerBound,
long partitionUpperBound) {
long partitionUpperBound,
long numPartitions) {
this.partitionColumnName = partitionColumnName;
this.partitionSize = partitionSize;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
this.numPartitions = numPartitions;
}

public String getPartitionColumnName() {
return partitionColumnName;
}

public long getPartitionSize() {
return partitionSize;
}

public long getPartitionLowerBound() {
return partitionLowerBound;
}
Expand All @@ -59,6 +55,10 @@ public long getPartitionUpperBound() {
return partitionUpperBound;
}

public long getNumPartitions() {
return numPartitions;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -68,9 +68,9 @@ public static Builder builder() {
*/
public static class Builder {
private String partitionColumnName = null;
private long partitionSize = -1L;
private long partitionLowerBound = Long.MAX_VALUE;
private long partitionUpperBound = Long.MIN_VALUE;
private long numPartitions = -1L;

/**
* optional, name of the column used for partitioning the input.
Expand All @@ -80,14 +80,6 @@ public Builder setPartitionColumnName(String partitionColumnName) {
return this;
}

/**
* optional, size of each input partition (except for the last partition which will be possibly smaller).
*/
public Builder setPartitionSize(long partitionSize) {
this.partitionSize = partitionSize;
return this;
}

/**
* optional, the smallest value of the first partition.
*/
Expand All @@ -104,13 +96,21 @@ public Builder setPartitionUpperBound(long partitionUpperBound) {
return this;
}

/**
* optional, the maximum number of partitions that can be used for parallelism in table reading.
*/
public Builder setNumPartitions(long numPartitions) {
this.numPartitions = numPartitions;
return this;
}

public JDBCScanOptions build() {
Preconditions.checkArgument(
partitionSize > 0, "Partition size must be positive");
numPartitions > 0, "Number of partitions must be positive");
Preconditions.checkArgument(
partitionLowerBound <= partitionUpperBound,
"Min partition value must not be larger than max partition value");
return new JDBCScanOptions(partitionColumnName, partitionSize, partitionLowerBound, partitionUpperBound);
return new JDBCScanOptions(partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -45,38 +40,39 @@
public class JDBCTableSource implements
StreamTableSource<Row>,
ProjectableTableSource<Row>,
LookupableTableSource<Row>,
DefinedFieldMapping {
LookupableTableSource<Row> {

private final JDBCOptions options;
private final JDBCScanOptions scanOptions;
private final JDBCLookupOptions lookupOptions;
private final TableSchema schema;
private final Map<String, String> fieldMapping;

private RowTypeInfo returnType;
// index of fields selected, null means that all fields are selected
private final int[] selectFields;

private JDBCTableSource(
JDBCOptions options, JDBCScanOptions scanOptions, JDBCLookupOptions lookupOptions,
TableSchema schema, Map<String, String> fieldMapping) {
this(options, scanOptions, lookupOptions, schema, fieldMapping, null);
JDBCOptions options, JDBCScanOptions scanOptions, JDBCLookupOptions lookupOptions, TableSchema schema) {
this(options, scanOptions, lookupOptions, schema, null);
}

private JDBCTableSource(
JDBCOptions options, JDBCScanOptions scanOptions, JDBCLookupOptions lookupOptions,
TableSchema schema, Map<String, String> fieldMapping, int[] selectFields) {
TableSchema schema, int[] selectFields) {
this.options = options;
this.scanOptions = scanOptions;
this.lookupOptions = lookupOptions;
this.schema = schema;
this.fieldMapping = fieldMapping;

calculateReturnType();
this.selectFields = selectFields;
}

@Override
public boolean isBounded() {
return true;
}

@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.createInput(getInputFormat(), getReturnType()).name(explainSource());
Expand All @@ -100,7 +96,7 @@ public TypeInformation<Row> getReturnType() {

@Override
public TableSource<Row> projectFields(int[] fields) {
return new JDBCTableSource(options, scanOptions, lookupOptions, schema, fieldMapping, fields);
return new JDBCTableSource(options, scanOptions, lookupOptions, schema, fields);
}

@Override
Expand All @@ -118,12 +114,6 @@ public TableSchema getTableSchema() {
return schema;
}

@Nullable
@Override
public Map<String, String> getFieldMapping() {
return fieldMapping;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -139,10 +129,14 @@ private JDBCInputFormat getInputFormat() {
String query = options.getDialect().getSelectFromStatement(
options.getTableName(), returnType.getFieldNames(), new String[0]);
if (scanOptions != null) {
long lowerBound = scanOptions.getPartitionLowerBound();
long upperBound = scanOptions.getPartitionUpperBound();
long numPartitions = scanOptions.getNumPartitions();
// partitionSize = Math.ceil(upperBound - lowerBound + 1) / numPartitions;
// the following is equivalent
long partitionSize = (upperBound - lowerBound + numPartitions) / numPartitions;
builder = builder.setParametersProvider(new NumericBetweenParametersProvider(
scanOptions.getPartitionSize(),
scanOptions.getPartitionLowerBound(),
scanOptions.getPartitionUpperBound()));
partitionSize, lowerBound, upperBound));
query += " WHERE " + scanOptions.getPartitionColumnName() + " >= ? " + " AND " +
scanOptions.getPartitionColumnName() + " <= ?";
}
Expand All @@ -154,25 +148,16 @@ private void calculateReturnType() {
final TypeInformation<?>[] schemaTypeInfos = schema.getFieldTypes();
final String[] schemaFieldNames = schema.getFieldNames();

String[] mappedFieldNames = new String[schemaFieldNames.length];
for (int i = 0; i < schemaFieldNames.length; i++) {
if (fieldMapping != null) {
mappedFieldNames[i] = fieldMapping.get(schemaFieldNames[i]);
} else {
mappedFieldNames[i] = schemaFieldNames[i];
}
}

if (selectFields != null) {
TypeInformation<?>[] typeInfos = new TypeInformation[selectFields.length];
String[] typeNames = new String[selectFields.length];
for (int i = 0; i < selectFields.length; i++) {
typeInfos[i] = schemaTypeInfos[selectFields[i]];
typeNames[i] = mappedFieldNames[selectFields[i]];
typeNames[i] = schemaFieldNames[selectFields[i]];
}
this.returnType = new RowTypeInfo(typeInfos, typeNames);
} else {
this.returnType = new RowTypeInfo(schemaTypeInfos, mappedFieldNames);
this.returnType = new RowTypeInfo(schemaTypeInfos, schemaFieldNames);
}
}

Expand All @@ -185,7 +170,6 @@ public static class Builder {
private JDBCScanOptions scanOptions;
private JDBCLookupOptions lookupOptions;
private TableSchema schema;
private Map<String, String> fieldMapping;

/**
* required, jdbc options.
Expand Down Expand Up @@ -221,14 +205,6 @@ public Builder setSchema(TableSchema schema) {
return this;
}

/**
* optional, map field names in the schema to the field names in the produced type.
*/
public Builder setFieldMapping(Map<String, String> fieldMapping) {
this.fieldMapping = fieldMapping;
return this;
}

/**
* Finalizes the configuration and checks validity.
*
Expand All @@ -237,7 +213,7 @@ public Builder setFieldMapping(Map<String, String> fieldMapping) {
public JDBCTableSource build() {
checkNotNull(options, "No options supplied.");
checkNotNull(schema, "No schema supplied.");
return new JDBCTableSource(options, scanOptions, lookupOptions, schema, fieldMapping);
return new JDBCTableSource(options, scanOptions, lookupOptions, schema);
}
}
}
Loading

0 comments on commit c1af055

Please sign in to comment.