Skip to content

Commit

Permalink
[FLINK-21635][jdbc] Support optional driverName in jdbc connection es…
Browse files Browse the repository at this point in the history
…tablishment

This closes apache#15104
  • Loading branch information
kezhuw committed Mar 8, 2021
1 parent c70000c commit 51524de
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class JdbcConnectionOptions implements Serializable {
private static final long serialVersionUID = 1L;

protected final String url;
protected final String driverName;
@Nullable protected final String driverName;
protected final int connectionCheckTimeoutSeconds;
@Nullable protected final String username;
@Nullable protected final String password;
Expand All @@ -45,7 +45,7 @@ protected JdbcConnectionOptions(
int connectionCheckTimeoutSeconds) {
Preconditions.checkArgument(connectionCheckTimeoutSeconds > 0);
this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
this.driverName = Preconditions.checkNotNull(driverName, "driver name is empty");
this.driverName = driverName;
this.username = username;
this.password = password;
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
Expand All @@ -55,6 +55,7 @@ public String getDbURL() {
return url;
}

@Nullable
public String getDriverName() {
return driverName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.connector.jdbc.internal.connection;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +43,7 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser

private final JdbcConnectionOptions jdbcOptions;

private transient Driver cachedDriver;
private transient Driver loadedDriver;
private transient Connection connection;

static {
Expand Down Expand Up @@ -72,16 +73,13 @@ public boolean isConnectionValid() throws SQLException {
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}

private Driver getDriver() throws SQLException, ClassNotFoundException {
if (cachedDriver != null) {
return cachedDriver;
}
String driverName = jdbcOptions.getDriverName();
private static Driver loadDriver(String driverName)
throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver.getClass().getName().equals(driverName)) {
cachedDriver = driver;
return driver;
}
}
Expand All @@ -91,17 +89,32 @@ private Driver getDriver() throws SQLException, ClassNotFoundException {
Class<?> clazz =
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
cachedDriver = (Driver) clazz.newInstance();
return cachedDriver;
return (Driver) clazz.newInstance();
} catch (Exception ex) {
throw new SQLException("Fail to create driver of class " + driverName, ex);
}
}

private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
if (loadedDriver == null) {
loadedDriver = loadDriver(jdbcOptions.getDriverName());
}
return loadedDriver;
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection == null) {
Driver driver = getDriver();
if (connection != null) {
return connection;
}
if (jdbcOptions.getDriverName() == null) {
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {
Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc;

import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;

import org.junit.Test;

/** Tests for {@link JdbcConnectionOptions}. */
public class JdbcConnectionOptionsTest {
@Test(expected = NullPointerException.class)
public void testNullUrl() {
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(null)
.withUsername("user")
.withPassword("password")
.withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
.build();
}

@Test
public void testNoOptionalOptions() {
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FakeDBUtils.TEST_DB_URL)
.build();
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidCheckTimeoutSeconds() {
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FakeDBUtils.TEST_DB_URL)
.withUsername("user")
.withPassword("password")
.withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
.withConnectionCheckTimeoutSeconds(0)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class SimpleJdbcConnectionProviderTest {

private static JdbcConnectionProvider newFakeConnectionProviderWithDriverName(
String driverName) {
return newProvider(driverName, FakeDBUtils.TEST_DB_URL);
return newProvider(FakeDBUtils.TEST_DB_URL, driverName);
}

private static JdbcConnectionProvider newProvider(String driverName, String url) {
private static JdbcConnectionProvider newProvider(String url, String driverName) {
JdbcConnectionOptions options =
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
Expand Down Expand Up @@ -85,6 +85,24 @@ public void testEstablishConnection() throws Exception {
assertSame(connection, provider.getOrEstablishConnection());
}

@Test
public void testEstablishConnectionWithoutDriverName() throws Exception {
JdbcConnectionProvider provider = newProvider(FakeDBUtils.TEST_DB_URL, null);
assertNull(provider.getConnection());
assertFalse(provider.isConnectionValid());

Connection connection = provider.getOrEstablishConnection();
assertNotNull(connection);
assertFalse(connection.isClosed());
assertTrue(provider.isConnectionValid());
assertThat(connection, instanceOf(FakeConnection.class));
assertThat(connection, not(instanceOf(FakeConnection3.class)));

assertNotNull(provider.getConnection());
assertSame(connection, provider.getConnection());
assertSame(connection, provider.getOrEstablishConnection());
}

@Test
public void testEstablishDriverConnection() throws Exception {
JdbcConnectionProvider provider1 =
Expand Down Expand Up @@ -117,7 +135,7 @@ public void testEstablishUnregisteredDriverConnection() throws Exception {
@Test
public void testInvalidDriverUrl() throws Exception {
JdbcConnectionProvider provider =
newProvider(FakeDBUtils.DRIVER1_CLASS_NAME, FakeDBUtils.TEST_DB_INVALID_URL);
newProvider(FakeDBUtils.TEST_DB_INVALID_URL, FakeDBUtils.DRIVER1_CLASS_NAME);
try {
provider.getOrEstablishConnection();
fail("expect exception");
Expand Down

0 comments on commit 51524de

Please sign in to comment.