Skip to content

Commit

Permalink
[FLINK-15445][jdbc] Support new type system and check unsupported dat…
Browse files Browse the repository at this point in the history
…a types for JDBC table source

This bridges the JDBC table source to new type system to support precisions of types. In the meanwhile, this also adds a validation to check unsupported types and precisions by various dialects to avoid surprising results.

This closes apache#10745
  • Loading branch information
docete authored Feb 12, 2020
1 parent aedbdc3 commit 4ad2c5f
Show file tree
Hide file tree
Showing 8 changed files with 619 additions and 84 deletions.
10 changes: 9 additions & 1 deletion flink-connectors/flink-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
Expand All @@ -84,7 +92,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.api.java.io.jdbc;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand All @@ -31,13 +30,15 @@
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;

import java.util.Arrays;
import java.util.Objects;

import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.normalizeTableSchema;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -55,7 +56,7 @@ public class JDBCTableSource implements

// index of fields selected, null means that all fields are selected
private final int[] selectFields;
private final RowTypeInfo returnType;
private final DataType producedDataType;

private JDBCTableSource(
JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions, TableSchema schema) {
Expand All @@ -72,18 +73,19 @@ private JDBCTableSource(

this.selectFields = selectFields;

final TypeInformation<?>[] schemaTypeInfos = schema.getFieldTypes();
final DataType[] schemaDataTypes = schema.getFieldDataTypes();
final String[] schemaFieldNames = schema.getFieldNames();
if (selectFields != null) {
TypeInformation<?>[] typeInfos = new TypeInformation[selectFields.length];
String[] typeNames = new String[selectFields.length];
DataType[] dataTypes = new DataType[selectFields.length];
String[] fieldNames = new String[selectFields.length];
for (int i = 0; i < selectFields.length; i++) {
typeInfos[i] = schemaTypeInfos[selectFields[i]];
typeNames[i] = schemaFieldNames[selectFields[i]];
dataTypes[i] = schemaDataTypes[selectFields[i]];
fieldNames[i] = schemaFieldNames[selectFields[i]];
}
this.returnType = new RowTypeInfo(typeInfos, typeNames);
this.producedDataType =
TableSchema.builder().fields(fieldNames, dataTypes).build().toRowDataType();
} else {
this.returnType = new RowTypeInfo(schemaTypeInfos, schemaFieldNames);
this.producedDataType = schema.toRowDataType();
}
}

Expand All @@ -94,23 +96,28 @@ public boolean isBounded() {

@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.createInput(getInputFormat(), getReturnType()).name(explainSource());
return execEnv
.createInput(
getInputFormat(),
(RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType))
.name(explainSource());
}

@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JDBCLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(returnType.getFieldTypes())
.setFieldNames(returnType.getFieldNames())
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}

@Override
public TypeInformation<Row> getReturnType() {
return returnType;
public DataType getProducedDataType() {
return producedDataType;
}

@Override
Expand All @@ -135,28 +142,30 @@ public TableSchema getTableSchema() {

@Override
public String explainSource() {
return TableConnectorUtils.generateRuntimeName(getClass(), returnType.getFieldNames());
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return TableConnectorUtils.generateRuntimeName(getClass(), rowTypeInfo.getFieldNames());
}

public static Builder builder() {
return new Builder();
}

private JDBCInputFormat getInputFormat() {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setRowTypeInfo(new RowTypeInfo(returnType.getFieldTypes(), returnType.getFieldNames()));
.setRowTypeInfo(new RowTypeInfo(rowTypeInfo.getFieldTypes(), rowTypeInfo.getFieldNames()));

if (readOptions.getFetchSize() != 0) {
builder.setFetchSize(readOptions.getFetchSize());
}

final JDBCDialect dialect = options.getDialect();
String query = dialect.getSelectFromStatement(
options.getTableName(), returnType.getFieldNames(), new String[0]);
options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.api.java.io.jdbc.dialect;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
Expand All @@ -35,6 +38,14 @@ public interface JDBCDialect extends Serializable {
*/
boolean canHandle(String url);

/**
* Check if this dialect instance support a specific data type in table schema.
* @param schema the table schema.
* @exception ValidationException in case of the table schema contains unsupported type.
*/
default void validate(TableSchema schema) throws ValidationException {
}

/**
* @return the default driver class name, if user not configure the driver class name,
* then will use this one.
Expand Down
Loading

0 comments on commit 4ad2c5f

Please sign in to comment.