Skip to content

Commit

Permalink
[FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFo…
Browse files Browse the repository at this point in the history
…rmat

This closes apache#12712
  • Loading branch information
fsk119 authored Jun 22, 2020
1 parent a5527e3 commit 9936a4d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ public synchronized void close() {
if (!closed) {
closed = true;

checkFlushException();

if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
Expand All @@ -220,7 +218,7 @@ public synchronized void close() {
try {
flush();
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
LOG.warn("Writing records to JDBC failed.", e);
}
}

Expand All @@ -233,6 +231,7 @@ public synchronized void close() {
}
}
super.close();
checkFlushException();
}

public static Builder builder() {
Expand Down Expand Up @@ -348,5 +347,4 @@ static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(String sql, int[]
static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
return (st, record) -> setRecordToStatement(st, types, record);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public synchronized void close() {
super.close();
} finally {
try {
deleteExecutor.closeStatements();
if (deleteExecutor != null){
deleteExecutor.closeStatements();
}
} catch (SQLException e) {
LOG.warn("unable to close delete statement runner", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
Expand All @@ -43,6 +44,7 @@
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.function.Function;
Expand All @@ -57,8 +59,10 @@
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;

/**
* Tests using both {@link JdbcInputFormat} and {@link JdbcBatchingOutputFormat}.
Expand Down Expand Up @@ -104,6 +108,49 @@ public void testEnrichedClassCastException() {
}
}

@Test
public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws Exception{
JdbcOptions options = JdbcOptions.builder()
.setDBUrl(getDbMetadata().getUrl())
.setTableName(OUTPUT_TABLE)
.build();
// use scheduledThreadPool
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000_000L)
.withBatchSize(2)
.withMaxRetries(1)
.build();
ExecutionConfig executionConfig = new ExecutionConfig();

RuntimeContext context = Mockito.mock(RuntimeContext.class);
JdbcBatchStatementExecutor executor = Mockito.mock(JdbcBatchStatementExecutor.class);

doReturn(executionConfig).when(context).getExecutionConfig();
// always throw Exception to trigger close() method
doThrow(SQLException.class).when(executor).executeBatch();

JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> format =
new JdbcBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(options),
jdbcExecutionOptions,
(ctx) -> executor,
(tuple2) -> tuple2.f1);

format.setRuntimeContext(context);
format.open(0, 1);

try {
for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
format.writeRecord(Tuple2.of(true, toRow(entry)));
}
} catch (Exception e) {
// artifact failure
format.close();
} finally {
assertNull(format.getConnection());
}
}

private void runTest(boolean exploitParallelism) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ public void setup() {
keyFields = new String[]{"id"};
}

@Test
public void testUpsertFormatCloseBeforeOpen() throws Exception{
JdbcOptions options = JdbcOptions.builder()
.setDBUrl(getDbMetadata().getUrl())
.setTableName(OUTPUT_TABLE)
.build();
JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
.withTableName(options.getTableName()).withDialect(options.getDialect())
.withFieldNames(fieldNames).withKeyFields(keyFields).build();
format = new TableJdbcUpsertOutputFormat(new SimpleJdbcConnectionProvider(options), dmlOptions, JdbcExecutionOptions.defaults());
// FLINK-17544: There should be no NPE thrown from this method
format.close();
}

@Test
public void testJdbcOutputFormat() throws Exception {
JdbcOptions options = JdbcOptions.builder()
Expand Down

0 comments on commit 9936a4d

Please sign in to comment.