Skip to content

Commit

Permalink
[FLINK-20658][jdbc] Establish connection through driver with given na…
Browse files Browse the repository at this point in the history
…me (apache#15043)
  • Loading branch information
kezhuw committed Mar 1, 2021
1 parent 24fe5ab commit f2f4973
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@

import java.io.Serializable;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;

/** Simple JDBC connection provider. */
@NotThreadSafe
Expand All @@ -39,6 +42,7 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser

private final JdbcConnectionOptions jdbcOptions;

private transient Driver cachedDriver;
private transient Connection connection;

static {
Expand Down Expand Up @@ -68,18 +72,45 @@ public boolean isConnectionValid() throws SQLException {
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}

private Driver getDriver() throws SQLException, ClassNotFoundException {
if (cachedDriver != null) {
return cachedDriver;
}
String driverName = jdbcOptions.getDriverName();
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver.getClass().getName().equals(driverName)) {
cachedDriver = driver;
return driver;
}
}
// We could reach here for reasons:
// * Class loader hell of DriverManager(see JDK-8146872).
// * driver is not installed as a service provider.
Class<?> clazz =
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
cachedDriver = (Driver) clazz.newInstance();
return cachedDriver;
} catch (Exception ex) {
throw new SQLException("Fail to create driver of class " + driverName, ex);
}
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection == null) {
Class.forName(jdbcOptions.getDriverName());
if (jdbcOptions.getUsername().isPresent()) {
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().get(),
jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
Driver driver = getDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
// Throw same exception as DriverManager.getConnection when no driver found to match
// caller expectation.
throw new SQLException(
"No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/** Tests for the {@link JdbcOutputFormat}. */
public class JdbcOutputFormatTest extends JdbcDataTestBase {
Expand Down Expand Up @@ -91,6 +92,7 @@ public void testInvalidURL() {
.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
.finish();
jdbcOutputFormat.open(0, 1);
fail("expect exception");
} catch (Exception e) {
assertTrue(findThrowable(e, IOException.class).isPresent());
assertTrue(findThrowableWithMessage(e, expectedMsg).isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ public class FakeDBUtils {
public static final String URL_PREFIX = "jdbc:fake:";

public static final String TEST_DB_URL = composeDBUrl("test");
public static final String TEST_DB_INVALID_URL = "jdbc:no-existing-driver:test";

public static final String DRIVER1_CLASS_NAME =
"org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1";
public static final String DRIVER2_CLASS_NAME =
"org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2";
public static final String DRIVER3_CLASS_NAME =
"org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver3";

public static String composeDBUrl(String db) {
return URL_PREFIX + db;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.fakedb.driver;

import java.util.Properties;

/** Sql connection created by {@link FakeDriver3#connect(String, Properties)}. */
public class FakeConnection3 extends FakeConnection {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.fakedb.driver;

import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Properties;
import java.util.logging.Logger;

/** Yet another {@link Driver} for FakeDB. */
public class FakeDriver3 implements Driver {

@Override
public Connection connect(String url, Properties info) throws SQLException {
if (!acceptsURL(url)) {
return null;
}
return new FakeConnection3();
}

@Override
public boolean acceptsURL(String url) throws SQLException {
return FakeDBUtils.acceptsUrl(url);
}

@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
return new DriverPropertyInfo[0];
}

@Override
public int getMajorVersion() {
return 0;
}

@Override
public int getMinorVersion() {
return 0;
}

@Override
public boolean jdbcCompliant() {
return false;
}

@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,43 @@
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection1;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection2;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection3;

import org.junit.Ignore;
import org.junit.Test;

import java.sql.Connection;

import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/** Test for {@link SimpleJdbcConnectionProvider}. */
public class SimpleJdbcConnectionProviderTest {

private static JdbcConnectionProvider newFakeConnectionProviderWithDriverName(
String driverName) {
return newProvider(driverName, FakeDBUtils.TEST_DB_URL);
}

private static JdbcConnectionProvider newProvider(String driverName, String url) {
JdbcConnectionOptions options =
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FakeDBUtils.TEST_DB_URL)
.withUrl(url)
.withDriverName(driverName)
.build();
return new SimpleJdbcConnectionProvider(options);
Expand Down Expand Up @@ -72,7 +86,6 @@ public void testEstablishConnection() throws Exception {
}

@Test
@Ignore("FLINK-20658")
public void testEstablishDriverConnection() throws Exception {
JdbcConnectionProvider provider1 =
newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER1_CLASS_NAME);
Expand All @@ -85,6 +98,37 @@ public void testEstablishDriverConnection() throws Exception {
assertThat(connection2, instanceOf(FakeConnection2.class));
}

@Test
public void testEstablishUnregisteredDriverConnection() throws Exception {
String unregisteredDriverName = FakeDBUtils.DRIVER3_CLASS_NAME;
Set<String> registeredDriverNames =
Collections.list(DriverManager.getDrivers()).stream()
.map(Driver::getClass)
.map(Class::getName)
.collect(Collectors.toSet());
assertThat(registeredDriverNames, not(hasItem(unregisteredDriverName)));

JdbcConnectionProvider provider =
newFakeConnectionProviderWithDriverName(unregisteredDriverName);
Connection connection = provider.getOrEstablishConnection();
assertThat(connection, instanceOf(FakeConnection3.class));
}

@Test
public void testInvalidDriverUrl() throws Exception {
JdbcConnectionProvider provider =
newProvider(FakeDBUtils.DRIVER1_CLASS_NAME, FakeDBUtils.TEST_DB_INVALID_URL);
try {
provider.getOrEstablishConnection();
fail("expect exception");
} catch (SQLException ex) {
assertThat(
ex.getMessage(),
containsString(
"No suitable driver found for " + FakeDBUtils.TEST_DB_INVALID_URL));
}
}

@Test
public void testCloseNullConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@

org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1
org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2
# Comment intentionally for unregistered driver
# org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver3

0 comments on commit f2f4973

Please sign in to comment.