diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java deleted file mode 100644 index a1e9b9eb9d316..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory; - -import java.io.IOException; -import java.io.InputStream; - -public final class DataInputViewStream extends InputStream { - - private final DataInputView inputView; - - - public DataInputViewStream(DataInputView inputView) { - this.inputView = inputView; - } - - - @Override - public int read() throws IOException { - return inputView.readByte(); - } - - public int read(byte[] b, int off, int len) throws IOException { - inputView.readFully(b, off, len); - return len; - } -} diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java deleted file mode 100644 index 945c9bc7210c8..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory; - -import java.io.IOException; -import java.io.OutputStream; - -public final class DataOutputViewStream extends OutputStream { - - private final DataOutputView outputView; - - - public DataOutputViewStream(DataOutputView outputView) { - this.outputView = outputView; - } - - - @Override - public void write(int b) throws IOException { - outputView.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } -} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java index 148227c932bbb..be17d64b3f67b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java @@ -24,10 +24,14 @@ import java.io.IOException; import java.io.InputStream; -public class DataInputViewStream extends InputStream{ +/** + * An input stream that draws its data from a {@link DataInputView}. + */ +public class DataInputViewStream extends InputStream { + protected DataInputView inputView; - public DataInputViewStream(DataInputView inputView){ + public DataInputViewStream(DataInputView inputView) { this.inputView = inputView; } @@ -37,9 +41,9 @@ public DataInputView getInputView(){ @Override public int read() throws IOException { - try{ + try { return inputView.readUnsignedByte(); - }catch(EOFException ex){ + } catch(EOFException ex) { return -1; } } @@ -47,16 +51,15 @@ public int read() throws IOException { @Override public long skip(long n) throws IOException { long counter = n; - while(counter > Integer.MAX_VALUE){ + while(counter > Integer.MAX_VALUE) { int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE); - if(skippedBytes == 0){ + if (skippedBytes == 0) { return n - counter; } counter -= skippedBytes; } - return n - counter - inputView.skipBytes((int) counter); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index f2c5848e6c2b3..9e302bfe55e77 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -22,8 +22,8 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import com.twitter.chill.ScalaKryoInstantiator; + import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -69,22 +69,24 @@ public T createInstance() { return null; } + @SuppressWarnings("unchecked") @Override public T copy(T from) { - if(from == null) { + if (from == null) { return null; } checkKryoInitialized(); try { return kryo.copy(from); - } catch(KryoException ke) { + } + catch(KryoException ke) { // kryo was unable to copy it, so we do it through serialization: ByteArrayOutputStream baout = new ByteArrayOutputStream(); Output output = new Output(baout); kryo.writeObject(output, from); - output.flush(); + output.close(); ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); Input input = new Input(bain); @@ -116,6 +118,7 @@ public void serialize(T record, DataOutputView target) throws IOException { output.flush(); } + @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { checkKryoInitialized();