Skip to content

Commit

Permalink
[FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC tabl…
Browse files Browse the repository at this point in the history
…e source with streaming table source (apache#9029)
  • Loading branch information
tsreaper authored and wuchong committed Jul 11, 2019
1 parent 01031ad commit a91d951
Show file tree
Hide file tree
Showing 15 changed files with 1,122 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void open(FunctionContext context) throws Exception {
try {
establishConnection();
statement = dbConn.prepareStatement(query);
this.cache = cacheMaxSize == -1 ? null : CacheBuilder.newBuilder()
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.java.io.jdbc;

import java.io.Serializable;
import java.util.Objects;

import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;

Expand Down Expand Up @@ -53,8 +54,20 @@ public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object o) {
if (o instanceof JDBCLookupOptions) {
JDBCLookupOptions options = (JDBCLookupOptions) o;
return Objects.equals(cacheMaxSize, options.cacheMaxSize) &&
Objects.equals(cacheExpireMs, options.cacheExpireMs) &&
Objects.equals(maxRetryTimes, options.maxRetryTimes);
} else {
return false;
}
}

/**
* Builder of {@link JDBCOptions}.
* Builder of {@link JDBCLookupOptions}.
*/
public static class Builder {
private long cacheMaxSize = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;

import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Options for the JDBC connector.
* Common options of {@link JDBCScanOptions} and {@link JDBCLookupOptions} for the JDBC connector.
*/
public class JDBCOptions {

Expand Down Expand Up @@ -75,6 +76,21 @@ public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object o) {
if (o instanceof JDBCOptions) {
JDBCOptions options = (JDBCOptions) o;
return Objects.equals(dbURL, options.dbURL) &&
Objects.equals(tableName, options.tableName) &&
Objects.equals(driverName, options.driverName) &&
Objects.equals(username, options.username) &&
Objects.equals(password, options.password) &&
Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName());
} else {
return false;
}
}

/**
* Builder of {@link JDBCOptions}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.io.jdbc;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

/**
* Options for the JDBC scan.
*/
public class JDBCReadOptions implements Serializable {

private final String partitionColumnName;
private final Long partitionLowerBound;
private final Long partitionUpperBound;
private final Integer numPartitions;

private final int fetchSize;

private JDBCReadOptions(
String partitionColumnName,
Long partitionLowerBound,
Long partitionUpperBound,
Integer numPartitions,
int fetchSize) {
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
this.numPartitions = numPartitions;

this.fetchSize = fetchSize;
}

public Optional<String> getPartitionColumnName() {
return Optional.ofNullable(partitionColumnName);
}

public Optional<Long> getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}

public Optional<Long> getPartitionUpperBound() {
return Optional.ofNullable(partitionUpperBound);
}

public Optional<Integer> getNumPartitions() {
return Optional.ofNullable(numPartitions);
}

public int getFetchSize() {
return fetchSize;
}

public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object o) {
if (o instanceof JDBCReadOptions) {
JDBCReadOptions options = (JDBCReadOptions) o;
return Objects.equals(partitionColumnName, options.partitionColumnName) &&
Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
Objects.equals(numPartitions, options.numPartitions) &&
Objects.equals(fetchSize, options.fetchSize);
} else {
return false;
}
}

/**
* Builder of {@link JDBCReadOptions}.
*/
public static class Builder {
private String partitionColumnName;
private Long partitionLowerBound;
private Long partitionUpperBound;
private Integer numPartitions;

private int fetchSize = 0;

/**
* optional, name of the column used for partitioning the input.
*/
public Builder setPartitionColumnName(String partitionColumnName) {
this.partitionColumnName = partitionColumnName;
return this;
}

/**
* optional, the smallest value of the first partition.
*/
public Builder setPartitionLowerBound(long partitionLowerBound) {
this.partitionLowerBound = partitionLowerBound;
return this;
}

/**
* optional, the largest value of the last partition.
*/
public Builder setPartitionUpperBound(long partitionUpperBound) {
this.partitionUpperBound = partitionUpperBound;
return this;
}

/**
* optional, the maximum number of partitions that can be used for parallelism in table reading.
*/
public Builder setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
return this;
}

/**
* optional, the number of rows to fetch per round trip.
* default value is 0, according to the jdbc api, 0 means that fetchSize hint will be ignored.
*/
public Builder setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}

public JDBCReadOptions build() {
return new JDBCReadOptions(
partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
}
}
}
Loading

0 comments on commit a91d951

Please sign in to comment.