Skip to content

Commit

Permalink
[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializ…
Browse files Browse the repository at this point in the history
…er to 'flink-core'

These core flink utils are independent of any other runtime classes and
are also used both in flink-runtime and in flink-queryable-state (which duplicated
the code).
  • Loading branch information
StephanEwen committed Nov 2, 2017
1 parent 198b74a commit 37df826
Show file tree
Hide file tree
Showing 38 changed files with 66 additions and 807 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.state.serialization;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemoryUtils;
package org.apache.flink.core.memory;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -29,8 +26,6 @@

/**
* A simple and efficient deserializer for the {@link java.io.DataInput} interface.
*
* <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
*/
public class DataInputDeserializer implements DataInputView, java.io.Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.state.serialization;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemoryUtils;
package org.apache.flink.core.memory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,8 +30,6 @@

/**
* A simple and efficient serializer for the {@link java.io.DataOutput} interface.
*
* <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
*/
public class DataOutputSerializer implements DataOutputView {

Expand Down Expand Up @@ -170,7 +164,6 @@ public void writeChars(String s) throws IOException {
if (this.position >= this.buffer.length - 2 * sLen) {
resize(2 * sLen);
}

for (int i = 0; i < sLen; i++) {
writeChar(s.charAt(i));
}
Expand Down Expand Up @@ -218,7 +211,7 @@ public void writeShort(int v) throws IOException {
resize(2);
}
this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
this.buffer[this.position++] = (byte) (v & 0xff);
}

@Override
Expand Down Expand Up @@ -250,9 +243,9 @@ else if (this.position > this.buffer.length - utflen - 2) {
int count = this.position;

bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
bytearr[count++] = (byte) (utflen & 0xFF);

int i = 0;
int i;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) {
Expand All @@ -268,11 +261,11 @@ else if (this.position > this.buffer.length - utflen - 2) {

} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.core.memory;

import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.core.memory;

import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Assert;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.runtime.io.network.api.serialization.types.Util;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;

/**
* Tests for the combination of {@link DataOutputSerializer} and {@link DataInputDeserializer}.
*/
public class DataInputOutputSerializerTest {

@Test
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testWrapAsByteBuffer() {
@Test
public void testRandomValuesWriteRead() {
final int numElements = 100000;
final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>();
final ArrayDeque<SerializationTestType> reference = new ArrayDeque<>();

DataOutputSerializer serializer = new DataOutputSerializer(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.util.Random;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

public enum SerializationTestTypeFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/


package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.io.IOException;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.api.serialization.types;
package org.apache.flink.testutils.serialization.types;

import java.util.Iterator;
import java.util.NoSuchElementException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

import java.io.EOFException;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;

/**
* Record serializer which serializes the complete record to an intermediate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.util.StringUtils;

import java.io.BufferedInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
package org.apache.flink.runtime.metrics.dump;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down
Loading

0 comments on commit 37df826

Please sign in to comment.