Skip to content

Commit

Permalink
[FLINK-20250][table-runtime] Fix NPE when invoking AsyncLookupJoinRun…
Browse files Browse the repository at this point in the history
…ner#close method

This closes apache#14207
  • Loading branch information
SteNicholas authored Dec 8, 2020
1 parent 0998813 commit 5043ca9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.join.lookup;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -153,11 +154,18 @@ public void close() throws Exception {
if (fetcher != null) {
FunctionUtils.closeFunction(fetcher);
}
for (JoinedRowResultFuture rf : allResultFutures) {
rf.close();
if (allResultFutures != null) {
for (JoinedRowResultFuture rf : allResultFutures) {
rf.close();
}
}
}

@VisibleForTesting
public List<JoinedRowResultFuture> getAllResultFutures() {
return allResultFutures;
}

/**
* The {@link JoinedRowResultFuture} is used to combine left {@link RowData} and
* right {@link RowData} into {@link JoinedRowData}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -63,6 +64,10 @@

import static org.apache.flink.table.data.StringData.fromString;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

/**
* Harness tests for {@link LookupJoinRunner} and {@link LookupJoinWithCalcRunner}.
Expand Down Expand Up @@ -250,6 +255,40 @@ private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
inSerializer);
}

@Test
public void testCloseAsyncLookupJoinRunner() throws Exception {
final InternalTypeInfo<RowData> rightRowTypeInfo = InternalTypeInfo.ofFields(
DataTypes.INT().getLogicalType(),
DataTypes.STRING().getLogicalType());
final AsyncLookupJoinRunner joinRunner = new AsyncLookupJoinRunner(
new GeneratedFunctionWrapper(new TestingFetcherFunction()),
new GeneratedResultFutureWrapper<>(new TestingFetcherResultFuture()),
rightRowTypeInfo,
rightRowTypeInfo,
true,
100);
assertNull(joinRunner.getAllResultFutures());
closeAsyncLookupJoinRunner(joinRunner);

joinRunner.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
joinRunner.open(new Configuration());
assertNotNull(joinRunner.getAllResultFutures());
closeAsyncLookupJoinRunner(joinRunner);

joinRunner.open(new Configuration());
joinRunner.asyncInvoke(row(1, "a"), new TestingFetcherResultFuture());
assertNotNull(joinRunner.getAllResultFutures());
closeAsyncLookupJoinRunner(joinRunner);
}

private void closeAsyncLookupJoinRunner(AsyncLookupJoinRunner joinRunner) throws Exception {
try {
joinRunner.close();
} catch (NullPointerException e) {
fail("Unexpected close to fail with null pointer exception.");
}
}

/**
* Whether this is a inner join or left join.
*/
Expand Down

0 comments on commit 5043ca9

Please sign in to comment.