Skip to content

Commit

Permalink
[FLINK-1300] [core] Remove unused and redundant files DataInputViewSt…
Browse files Browse the repository at this point in the history
…ream and DataOutputViewStream.
  • Loading branch information
StephanEwen committed Jan 6, 2015
1 parent baf81c6 commit 875c6aa
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 97 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.io.IOException;
import java.io.InputStream;

public class DataInputViewStream extends InputStream{
/**
* An input stream that draws its data from a {@link DataInputView}.
*/
public class DataInputViewStream extends InputStream {

protected DataInputView inputView;

public DataInputViewStream(DataInputView inputView){
public DataInputViewStream(DataInputView inputView) {
this.inputView = inputView;
}

Expand All @@ -37,26 +41,25 @@ public DataInputView getInputView(){

@Override
public int read() throws IOException {
try{
try {
return inputView.readUnsignedByte();
}catch(EOFException ex){
} catch(EOFException ex) {
return -1;
}
}

@Override
public long skip(long n) throws IOException {
long counter = n;
while(counter > Integer.MAX_VALUE){
while(counter > Integer.MAX_VALUE) {
int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE);

if(skippedBytes == 0){
if (skippedBytes == 0) {
return n - counter;
}

counter -= skippedBytes;
}

return n - counter - inputView.skipBytes((int) counter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import com.twitter.chill.ScalaKryoInstantiator;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -69,22 +69,24 @@ public T createInstance() {
return null;
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if(from == null) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
} catch(KryoException ke) {
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.flush();
output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);
Expand Down Expand Up @@ -116,6 +118,7 @@ public void serialize(T record, DataOutputView target) throws IOException {
output.flush();
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
Expand Down

0 comments on commit 875c6aa

Please sign in to comment.