Skip to content

Commit

Permalink
[FLINK-20295][table][fs-connector] Table File Source lost data when r…
Browse files Browse the repository at this point in the history
…eading from directories with JSON format

This closes apache#14192
  • Loading branch information
JingsongLi committed Nov 25, 2020
1 parent c41051f commit 07a449e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ private class HiveReader implements BulkFormat.Reader<RowData> {

private final HiveMapredSplitReader hiveMapredSplitReader;
private final RowDataSerializer serializer;
private final ArrayResultIterator<RowData> iterator = new ArrayResultIterator<>();
private final int[] selectedFields;
private long numRead = 0;

Expand All @@ -287,6 +286,8 @@ public RecordIterator<RowData> readBatch() throws IOException {
if (num == 0) {
return null;
}

ArrayResultIterator<RowData> iterator = new ArrayResultIterator<>();
iterator.set(records, num, NO_OFFSET, skipCount);
return iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package org.apache.flink.formats.json;

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;

import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -59,4 +63,52 @@ public void testParseError() throws Exception {
Row.of("x5,5,1,1"),
Row.of("x5,5,1,1")));
}

@Test
public void bigDataTest() throws IOException {
int numRecords = 1000;
File dir = generateTestData(numRecords);

env().setParallelism(1);

String sql = String.format(
"CREATE TABLE bigdata_source ( " +
" id INT, " +
" content STRING" +
") PARTITIONED by (id) WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '%s'," +
" 'format' = 'json'" +
")", dir);
tEnv().executeSql(sql);
TableResult result = tEnv().executeSql("select * from bigdata_source");
List<String> elements = new ArrayList<>();
result.collect().forEachRemaining(r -> elements.add((String) r.getField(1)));
Assert.assertEquals(numRecords, elements.size());
elements.sort(String::compareTo);

List<String> expected = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
expected.add(String.valueOf(i));
}
expected.sort(String::compareTo);

Assert.assertEquals(expected, elements);
}

private static File generateTestData(int numRecords) throws IOException {
File tempDir = TEMPORARY_FOLDER.newFolder();

File root = new File(tempDir, "id=0");
root.mkdir();

File dataFile = new File(root, "testdata");
try (PrintWriter writer = new PrintWriter(dataFile)) {
for (int i = 0; i < numRecords; ++i) {
writer.println(String.format("{\"content\":\"%s\"}", i));
}
}

return tempDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ public TypeInformation<RowData> getProducedType() {
private class Reader implements BulkFormat.Reader<RowData> {

private final LineBytesInputFormat inputFormat;
private final ArrayResultIterator<RowData> iterator = new ArrayResultIterator<>();
private long numRead = 0;

private Reader(Configuration config, FileSourceSplit split) throws IOException {
Expand All @@ -173,7 +172,7 @@ private Reader(Configuration config, FileSourceSplit split) throws IOException {
@Nullable
@Override
public RecordIterator<RowData> readBatch() throws IOException {
Object[] records = new Object[DEFAULT_SIZE];
RowData[] records = new RowData[DEFAULT_SIZE];
int num = 0;
final long skipCount = numRead;
for (int i = 0; i < BATCH_SIZE; i++) {
Expand All @@ -187,7 +186,9 @@ public RecordIterator<RowData> readBatch() throws IOException {
return null;
}
numRead += num;
((ArrayResultIterator) iterator).set(records, num, NO_OFFSET, skipCount);

ArrayResultIterator<RowData> iterator = new ArrayResultIterator<>();
iterator.set(records, num, NO_OFFSET, skipCount);
return iterator;
}

Expand Down

0 comments on commit 07a449e

Please sign in to comment.