Skip to content

Commit

Permalink
[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.
Browse files Browse the repository at this point in the history
This closes apache#4036.
  • Loading branch information
mbode authored and zentol committed Jun 2, 2017
1 parent c041dd8 commit 5605107
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.java.io.jdbc;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
private transient Connection dbConn;
private transient PreparedStatement statement;
private transient ResultSet resultSet;
private int fetchSize;

private boolean hasNext;
private Object[][] parameterValues;
Expand Down Expand Up @@ -141,6 +144,9 @@ public void openInputFormat() {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
if (fetchSize > 0) {
statement.setFetchSize(fetchSize);
}
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
} catch (ClassNotFoundException cnfe) {
Expand Down Expand Up @@ -312,6 +318,11 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}

@VisibleForTesting
PreparedStatement getStatement() {
return statement;
}

/**
* A builder used to set parameters to the output format's configuration in a fluent way.
* @return builder
Expand Down Expand Up @@ -378,6 +389,12 @@ public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
return this;
}

public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize);
format.fetchSize = fetchSize;
return this;
}

public JDBCInputFormat finish() {
if (format.username == null) {
LOG.info("Username was not supplied separately.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import java.io.IOException;
import java.io.Serializable;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* Tests for the {@link JDBCInputFormat}.
Expand Down Expand Up @@ -100,6 +102,47 @@ public void testIncompleteConfiguration() throws IOException {
.finish();
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidFetchSize() {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setFetchSize(-7)
.finish();
}

@Test
public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();

Class.forName(DRIVER_CLASS);
final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();

Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
}

@Test
public void testFetchSizeCanBeConfigured() throws SQLException {
final int desiredFetchSize = 10_000;
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setFetchSize(desiredFetchSize)
.finish();
jdbcInputFormat.openInputFormat();
Assert.assertEquals(desiredFetchSize, jdbcInputFormat.getStatement().getFetchSize());
}

@Test
public void testJDBCInputFormatWithoutParallelism() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
Expand Down

0 comments on commit 5605107

Please sign in to comment.