Skip to content

Commit

Permalink
[FLINK-2314] Make Streaming File Sources Persistent
Browse files Browse the repository at this point in the history
This commit is a combination of several commits/changes. It combines
changes to the file input formats and the streaming file read operator
and integrates them into the API.

These are the messages of the other two commits:

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.
  • Loading branch information
kl0u authored and aljoscha committed Jun 14, 2016
1 parent bc19486 commit d353895
Show file tree
Hide file tree
Showing 31 changed files with 2,745 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
Expand All @@ -42,15 +44,16 @@

/**
* Provides a {@link FileInputFormat} for Avro records.
*
*
* @param <E>
* the type of the result Avro record. If you specify
* {@link GenericRecord} then the result will be returned as a
* {@link GenericRecord}, so you do not have to know the schema ahead
* of time.
*/
public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
Expand All @@ -59,16 +62,19 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType

private boolean reuseAvroValue = true;

private transient FileReader<E> dataFileReader;
private transient DataFileReader<E> dataFileReader;

private transient long end;


private transient long recordsReadSinceLastSync;

private transient long lastSync = -1l;

public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
this.avroValueType = type;
}



/**
* Sets the flag whether to reuse the Avro value instance for all records.
* By default, the input format reuses the Avro value.
Expand Down Expand Up @@ -102,42 +108,59 @@ public TypeInformation<E> getProducedType() {
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
dataFileReader = initReader(split);
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
}

private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;

if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}

if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}

SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

dataFileReader = DataFileReader.openReader(in, datumReader);

if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}

dataFileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();

end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}

@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
}

public long getRecordsReadFromBlock() {
return this.recordsReadSinceLastSync;
}

@Override
public E nextRecord(E reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}

// if we start a new block, then register the event, and
// restart the counter.
if(dataFileReader.previousSync() != lastSync) {
lastSync = dataFileReader.previousSync();
recordsReadSinceLastSync = 0;
}
recordsReadSinceLastSync++;

if (reuseAvroValue) {
return dataFileReader.next(reuseValue);
} else {
Expand All @@ -148,4 +171,34 @@ public E nextRecord(E reuseValue) throws IOException {
}
}
}

// --------------------------------------------------------------------------------------------
// Checkpointing
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state.f0 != -1) {

// go to the block we stopped
lastSync = state.f0;
dataFileReader.seek(lastSync);

// read until the record we were before the checkpoint and discard the values
long recordsToDiscard = state.f1;
for(int i = 0; i < recordsToDiscard; i++) {
dataFileReader.next(null);
recordsReadSinceLastSync++;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.io.avro.generated.Fixed16;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -166,7 +167,7 @@ public void testSplittedIF() throws IOException {
Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);

format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);
Expand All @@ -191,6 +192,98 @@ public void testSplittedIF() throws IOException {
format.close();
}

@Test
public void testAvroRecoveryWithFailureAtStart() throws Exception {
final int recordsUntilCheckpoint = 132;

Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);

FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);

int elements = 0;
int elementsPerSplit[] = new int[4];
for(int i = 0; i < splits.length; i++) {
format.reopen(splits[i], format.getCurrentState());
while(!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;

if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);

format.reopen(splits[i], state);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
}
format.close();
}

Assert.assertEquals(1539, elementsPerSplit[0]);
Assert.assertEquals(1026, elementsPerSplit[1]);
Assert.assertEquals(1539, elementsPerSplit[2]);
Assert.assertEquals(896, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}

@Test
public void testAvroRecovery() throws Exception {
final int recordsUntilCheckpoint = 132;

Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);

FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);

int elements = 0;
int elementsPerSplit[] = new int[4];
for(int i = 0; i < splits.length; i++) {
format.open(splits[i]);
while(!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;

if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);

format.reopen(splits[i], state);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
}
format.close();
}

Assert.assertEquals(1539, elementsPerSplit[0]);
Assert.assertEquals(1026, elementsPerSplit[1]);
Assert.assertEquals(1539, elementsPerSplit[2]);
Assert.assertEquals(896, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}

/*
This test is gave the reference values for the test of Flink's IF.
Expand Down
Loading

0 comments on commit d353895

Please sign in to comment.