Skip to content

Commit

Permalink
[FLINK-1081] Add getPos() method into FSDataInputStream class
Browse files Browse the repository at this point in the history
  • Loading branch information
chiwanpark authored and gyfora committed Jan 25, 2015
1 parent 7ce9a8f commit 7aa9a50
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void seek(long desired) throws IOException {
throw new UnsupportedOperationException("Compressed streams do not support the seek operation");
}

@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException("Compressed streams do not support the getPos operation");
}

@Override
public int read() throws IOException {
return inStream.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,10 @@ public abstract class FSDataInputStream extends InputStream {
*/
public abstract void seek(long desired) throws IOException;

/**
* Get the current position in the input stream.
*
* @return current position in the input stream
*/
public abstract long getPos() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public void seek(final long desired) throws IOException {
this.fis.getChannel().position(desired);
}

@Override
public long getPos() throws IOException {
return this.fis.getChannel().position();
}


@Override
public int read() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public synchronized void seek(long desired) throws IOException {
fsDataInputStream.seek(desired);
}

@Override
public long getPos() throws IOException {

return fsDataInputStream.getPos();
}

@Override
public int read() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
/**
* This class implements an {@link FSDataInputStream} that downloads its data from Amazon S3 in the background.
* Essentially, this class is just a wrapper to the Amazon AWS SDK.
*
*/
public class S3DataInputStream extends FSDataInputStream {

Expand All @@ -41,9 +40,19 @@ public class S3DataInputStream extends FSDataInputStream {
*/
private final InputStream inputStream;

/**
* The current position of input stream.
*/
private long position;

/**
* The marked position.
*/
private long marked;

/**
* Constructs a new input stream which reads its data from the specified S3 object.
*
*
* @param s3Client
* the S3 client to connect to Amazon S3.
* @param bucket
Expand All @@ -63,6 +72,8 @@ public class S3DataInputStream extends FSDataInputStream {
}

this.inputStream = s3o.getObjectContent();
this.position = 0;
this.marked = 0;
}


Expand All @@ -84,6 +95,7 @@ public void close() throws IOException {
public void mark(final int readlimit) {

this.inputStream.mark(readlimit);
marked = readlimit;
}


Expand All @@ -97,34 +109,65 @@ public boolean markSupported() {
@Override
public int read() throws IOException {

return this.inputStream.read();
int read = this.inputStream.read();
if (read != -1) {
++position;
}

return read;
}


@Override
public int read(final byte[] b) throws IOException {

return this.inputStream.read(b);
int read = this.inputStream.read(b);
if (read > 0) {
position += read;
}

return read;
}


@Override
public int read(final byte[] b, final int off, final int len) throws IOException {

return this.inputStream.read(b, off, len);
int read = this.inputStream.read(b, off, len);
if (read > 0) {
position += read;
}

return read;
}


@Override
public void reset() throws IOException {

this.inputStream.reset();
position = marked;
}


@Override
public void seek(final long desired) throws IOException {

this.inputStream.skip(desired);
skip(desired);
}

@Override
public long skip(long n) throws IOException {
long skipped = this.inputStream.skip(n);
if (skipped > 0) {
position += skipped;
}

return skipped;
}

@Override
public long getPos() throws IOException {
return position;
}
}

0 comments on commit 7aa9a50

Please sign in to comment.