From ca2bc35f0da5610c7153f40ca86d46d007d83845 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Sun, 16 Feb 2020 20:59:59 +0800 Subject: [PATCH] [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for reading and writing Arrow format data This closes #11112. --- flink-python/pom.xml | 29 ++ .../table/runtime/arrow/ArrowReader.java | 35 +++ .../flink/table/runtime/arrow/ArrowUtils.java | 279 ++++++++++++++++++ .../table/runtime/arrow/ArrowWriter.java | 85 ++++++ .../arrow/readers/ArrowFieldReader.java | 54 ++++ .../arrow/readers/BigIntFieldReader.java | 39 +++ .../runtime/arrow/readers/IntFieldReader.java | 39 +++ .../runtime/arrow/readers/RowArrowReader.java | 61 ++++ .../arrow/readers/SmallIntFieldReader.java | 39 +++ .../arrow/readers/TinyIntFieldReader.java | 39 +++ .../vectors/ArrowBigIntColumnVector.java | 56 ++++ .../arrow/vectors/ArrowIntColumnVector.java | 53 ++++ .../vectors/ArrowSmallIntColumnVector.java | 53 ++++ .../vectors/ArrowTinyIntColumnVector.java | 53 ++++ .../arrow/vectors/BaseRowArrowReader.java | 63 ++++ .../arrow/writers/ArrowFieldWriter.java | 89 ++++++ .../arrow/writers/BaseRowBigIntWriter.java | 44 +++ .../arrow/writers/BaseRowIntWriter.java | 44 +++ .../arrow/writers/BaseRowSmallIntWriter.java | 44 +++ .../arrow/writers/BaseRowTinyIntWriter.java | 44 +++ .../runtime/arrow/writers/BigIntWriter.java | 44 +++ .../runtime/arrow/writers/IntWriter.java | 44 +++ .../runtime/arrow/writers/SmallIntWriter.java | 44 +++ .../runtime/arrow/writers/TinyIntWriter.java | 44 +++ .../src/main/resources/META-INF/NOTICE | 6 + .../arrow/ArrowReaderWriterTestBase.java | 89 ++++++ .../table/runtime/arrow/ArrowUtilsTest.java | 151 ++++++++++ .../arrow/BaseRowArrowReaderWriterTest.java | 125 ++++++++ .../arrow/RowArrowReaderWriterTest.java | 92 ++++++ .../table/runtime/util/StreamRecordUtils.java | 4 + pom.xml | 1 + 31 files changed, 1886 insertions(+) create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowBigIntColumnVector.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowIntColumnVector.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowSmallIntColumnVector.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowTinyIntColumnVector.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/BaseRowArrowReader.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/ArrowFieldWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowBigIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowSmallIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowTinyIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BigIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/IntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/SmallIntWriter.java create mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/TinyIntWriter.java create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowReaderWriterTestBase.java create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReaderWriterTest.java create mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 9f63bcbc1b558..d1cd2af2fd60f 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -113,6 +113,20 @@ under the License. + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + commons-codec + commons-codec + + + + @@ -281,6 +295,9 @@ under the License. com.fasterxml.jackson.core:* joda-time:* com.google.protobuf:* + org.apache.arrow:* + io.netty:* + com.google.flatbuffers:* @@ -320,6 +337,18 @@ under the License. com.google.protobuf org.apache.flink.api.python.shaded.com.google.protobuf + + org.apache.arrow + org.apache.flink.api.python.shaded.org.apache.arrow + + + io.netty + org.apache.flink.api.python.shaded.io.netty + + + com.google.flatbuffers + org.apache.flink.api.python.shaded.com.google.flatbuffers + diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowReader.java new file mode 100644 index 0000000000000..a504898ff1c51 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowReader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.annotation.Internal; + +/** + * Reader which deserialize the Arrow format data to the Flink rows. + * + * @param Type of the deserialized row. + */ +@Internal +public interface ArrowReader { + + /** + * Read the specified row from underlying Arrow format data. + */ + OUT read(int rowId); +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java new file mode 100644 index 0000000000000..72212277d720f --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader; +import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.IntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.RowArrowReader; +import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader; +import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.BaseRowArrowReader; +import org.apache.flink.table.runtime.arrow.writers.ArrowFieldWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.IntWriter; +import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; +import org.apache.flink.types.Row; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for Arrow. + */ +@Internal +public final class ArrowUtils { + + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + /** + * Returns the Arrow schema of the specified type. + */ + public static Schema toArrowSchema(RowType rowType) { + Collection fields = rowType.getFields().stream() + .map(ArrowUtils::toArrowField) + .collect(Collectors.toCollection(ArrayList::new)); + return new Schema(fields); + } + + private static Field toArrowField(RowType.RowField rowField) { + FieldType fieldType = new FieldType( + rowField.getType().isNullable(), + rowField.getType().accept(LogicalTypeToArrowTypeConverter.INSTANCE), + null); + return new Field(rowField.getName(), fieldType, null); + } + + /** + * Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. + */ + public static ArrowWriter createRowArrowWriter(VectorSchemaRoot root) { + ArrowFieldWriter[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()]; + List vectors = root.getFieldVectors(); + for (int i = 0; i < vectors.size(); i++) { + FieldVector vector = vectors.get(i); + vector.allocateNew(); + fieldWriters[i] = createRowArrowFieldWriter(vector); + } + + return new ArrowWriter<>(root, fieldWriters); + } + + private static ArrowFieldWriter createRowArrowFieldWriter(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new TinyIntWriter((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new SmallIntWriter((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new IntWriter((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new BigIntWriter((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowFieldToLogicalType(vector.getField()))); + } + } + + /** + * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}. + */ + public static ArrowWriter createBaseRowArrowWriter(VectorSchemaRoot root) { + ArrowFieldWriter[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()]; + List vectors = root.getFieldVectors(); + for (int i = 0; i < vectors.size(); i++) { + FieldVector vector = vectors.get(i); + vector.allocateNew(); + fieldWriters[i] = createBaseRowArrowFieldWriter(vector); + } + + return new ArrowWriter<>(root, fieldWriters); + } + + private static ArrowFieldWriter createBaseRowArrowFieldWriter(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new BaseRowTinyIntWriter((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new BaseRowSmallIntWriter((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new BaseRowIntWriter((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new BaseRowBigIntWriter((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowFieldToLogicalType(vector.getField()))); + } + } + + /** + * Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. + */ + public static RowArrowReader createRowArrowReader(VectorSchemaRoot root) { + List fieldReaders = new ArrayList<>(); + for (FieldVector vector : root.getFieldVectors()) { + fieldReaders.add(createRowArrowFieldReader(vector)); + } + + return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0])); + } + + private static ArrowFieldReader createRowArrowFieldReader(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new TinyIntFieldReader((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new SmallIntFieldReader((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new IntFieldReader((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new BigIntFieldReader((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowFieldToLogicalType(vector.getField()))); + } + } + + /** + * Creates an {@link ArrowReader} for blink planner for the specified {@link VectorSchemaRoot}. + */ + public static BaseRowArrowReader createBaseRowArrowReader(VectorSchemaRoot root) { + List columnVectors = new ArrayList<>(); + for (FieldVector vector : root.getFieldVectors()) { + columnVectors.add(createColumnVector(vector)); + } + + return new BaseRowArrowReader(columnVectors.toArray(new ColumnVector[0])); + } + + private static ColumnVector createColumnVector(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new ArrowTinyIntColumnVector((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new ArrowSmallIntColumnVector((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new ArrowIntColumnVector((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new ArrowBigIntColumnVector((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowFieldToLogicalType(vector.getField()))); + } + } + + public static LogicalType fromArrowFieldToLogicalType(Field field) { + if (field.getType() == ArrowType.List.INSTANCE) { + LogicalType elementType = fromArrowFieldToLogicalType(field.getChildren().get(0)); + return new ArrayType(field.isNullable(), elementType); + } else if (field.getType() == ArrowType.Struct.INSTANCE) { + List fields = field.getChildren().stream().map(child -> { + LogicalType type = fromArrowFieldToLogicalType(child); + return new RowType.RowField(child.getName(), type, null); + }).collect(Collectors.toList()); + return new RowType(field.isNullable(), fields); + } else if (field.getType() instanceof ArrowType.Map) { + Field elementField = field.getChildren().get(0); + LogicalType keyType = fromArrowFieldToLogicalType(elementField.getChildren().get(0)); + LogicalType valueType = fromArrowFieldToLogicalType(elementField.getChildren().get(1)); + return new MapType(field.isNullable(), keyType, valueType); + } else { + return fromArrowTypeToLogicalType(field.isNullable(), field.getType()); + } + } + + private static LogicalType fromArrowTypeToLogicalType(boolean isNullable, ArrowType arrowType) { + if (arrowType instanceof ArrowType.Int && ((ArrowType.Int) arrowType).getIsSigned()) { + ArrowType.Int intType = (ArrowType.Int) arrowType; + if (intType.getBitWidth() == 8) { + return new TinyIntType(isNullable); + } else if (intType.getBitWidth() == 8 * 2) { + return new SmallIntType(isNullable); + } else if (intType.getBitWidth() == 8 * 4) { + return new IntType(isNullable); + } else if (intType.getBitWidth() == 8 * 8) { + return new BigIntType(isNullable); + } + } + throw new UnsupportedOperationException( + String.format("Unexpected arrow type: %s.", arrowType.toString())); + } + + private static class LogicalTypeToArrowTypeConverter extends LogicalTypeDefaultVisitor { + + private static final LogicalTypeToArrowTypeConverter INSTANCE = new LogicalTypeToArrowTypeConverter(); + + @Override + public ArrowType visit(TinyIntType tinyIntType) { + return new ArrowType.Int(8, true); + } + + @Override + public ArrowType visit(SmallIntType smallIntType) { + return new ArrowType.Int(2 * 8, true); + } + + @Override + public ArrowType visit(IntType intType) { + return new ArrowType.Int(4 * 8, true); + } + + @Override + public ArrowType visit(BigIntType bigIntType) { + return new ArrowType.Int(8 * 8, true); + } + + @Override + protected ArrowType defaultMethod(LogicalType logicalType) { + throw new UnsupportedOperationException(String.format( + "Python vectorized UDF doesn't support logical type %s currently.", logicalType.asSummaryString())); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowWriter.java new file mode 100644 index 0000000000000..9e969624de15c --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowWriter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.runtime.arrow.writers.ArrowFieldWriter; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.VectorSchemaRoot; + +/** + * Writer which serializes the Flink rows to Arrow format. + * + * @param Type of the row to write. + */ +@Internal +public final class ArrowWriter { + + /** + * Container that holds a set of vectors for the rows to be sent to the Python worker. + */ + private final VectorSchemaRoot root; + + /** + * An array of writers which are responsible for the serialization of each column of the rows. + */ + private final ArrowFieldWriter[] fieldWriters; + + public ArrowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) { + this.root = Preconditions.checkNotNull(root); + this.fieldWriters = Preconditions.checkNotNull(fieldWriters); + } + + /** + * Gets the field writers. + */ + public ArrowFieldWriter[] getFieldWriters() { + return fieldWriters; + } + + /** + * Writes the specified row which is serialized into Arrow format. + */ + public void write(IN row) { + for (int i = 0; i < fieldWriters.length; i++) { + fieldWriters[i].write(row, i); + } + } + + /** + * Finishes the writing of the current row batch. + */ + public void finish() { + root.setRowCount(fieldWriters[0].getCount()); + for (ArrowFieldWriter fieldWriter : fieldWriters) { + fieldWriter.finish(); + } + } + + /** + * Resets the state of the writer to write the next batch of rows. + */ + public void reset() { + root.setRowCount(0); + for (ArrowFieldWriter fieldWriter : fieldWriters) { + fieldWriter.reset(); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java new file mode 100644 index 0000000000000..e3b61ffc5ebc6 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.ValueVector; + +/** + * Base class for arrow field reader. + * + * @param Type of the row to write. + */ +@Internal +public abstract class ArrowFieldReader { + + /** + * Container which is used to store the sequence of values of a column to read. + */ + private final ValueVector valueVector; + + public ArrowFieldReader(ValueVector valueVector) { + this.valueVector = Preconditions.checkNotNull(valueVector); + } + + /** + * Returns the underlying container which stores the sequence of values of a column to read. + */ + public ValueVector getValueVector() { + return valueVector; + } + + /** + * Sets the field value as the specified value. + */ + public abstract OUT read(int index); +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java new file mode 100644 index 0000000000000..20e68ae17bf79 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; + +import org.apache.arrow.vector.BigIntVector; + +/** + * {@link ArrowFieldReader} for BigInt. + */ +@Internal +public final class BigIntFieldReader extends ArrowFieldReader { + + public BigIntFieldReader(BigIntVector bigIntVector) { + super(bigIntVector); + } + + @Override + public Long read(int index) { + return ((BigIntVector) getValueVector()).getObject(index); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java new file mode 100644 index 0000000000000..d27b75957943d --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; + +import org.apache.arrow.vector.IntVector; + +/** + * {@link ArrowFieldReader} for Int. + */ +@Internal +public final class IntFieldReader extends ArrowFieldReader { + + public IntFieldReader(IntVector intVector) { + super(intVector); + } + + @Override + public Integer read(int index) { + return ((IntVector) getValueVector()).getObject(index); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java new file mode 100644 index 0000000000000..3cc78de419d89 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.runtime.arrow.ArrowReader; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * {@link ArrowReader} which read the underlying Arrow format data as {@link Row}. + */ +@Internal +public final class RowArrowReader implements ArrowReader { + + /** + * An array of readers which are responsible for the deserialization of each column of the rows. + */ + private final ArrowFieldReader[] fieldReaders; + + /** + * Reusable row used to hold the deserialized result. + */ + private final Row reuseRow; + + public RowArrowReader(ArrowFieldReader[] fieldReaders) { + this.fieldReaders = Preconditions.checkNotNull(fieldReaders); + this.reuseRow = new Row(fieldReaders.length); + } + + /** + * Gets the field readers. + */ + public ArrowFieldReader[] getFieldReaders() { + return fieldReaders; + } + + @Override + public Row read(int rowId) { + for (int i = 0; i < fieldReaders.length; i++) { + reuseRow.setField(i, fieldReaders[i].read(rowId)); + } + return reuseRow; + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java new file mode 100644 index 0000000000000..974c9cb40c515 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; + +import org.apache.arrow.vector.SmallIntVector; + +/** + * {@link ArrowFieldReader} for SmallInt. + */ +@Internal +public final class SmallIntFieldReader extends ArrowFieldReader { + + public SmallIntFieldReader(SmallIntVector smallIntVector) { + super(smallIntVector); + } + + @Override + public Short read(int index) { + return ((SmallIntVector) getValueVector()).getObject(index); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java new file mode 100644 index 0000000000000..f451a0c6ab0e1 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.readers; + +import org.apache.flink.annotation.Internal; + +import org.apache.arrow.vector.TinyIntVector; + +/** + * {@link ArrowFieldReader} for TinyInt. + */ +@Internal +public final class TinyIntFieldReader extends ArrowFieldReader { + + public TinyIntFieldReader(TinyIntVector tinyIntVector) { + super(tinyIntVector); + } + + @Override + public Byte read(int index) { + return ((TinyIntVector) getValueVector()).getObject(index); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowBigIntColumnVector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowBigIntColumnVector.java new file mode 100644 index 0000000000000..3282717ba3c00 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowBigIntColumnVector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.vectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.BigIntVector; + +/** + * Arrow column vector for BigInt. + */ +@Internal +public final class ArrowBigIntColumnVector implements LongColumnVector { + + /** + * Container which is used to store the sequence of bigint values of a column to read. + */ + private final BigIntVector bigIntVector; + + public ArrowBigIntColumnVector(BigIntVector bigIntVector) { + this.bigIntVector = Preconditions.checkNotNull(bigIntVector); + } + + @Override + public long getLong(int i) { + return bigIntVector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return bigIntVector.isNull(i); + } + + @Override + public void reset() { + bigIntVector.reset(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowIntColumnVector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowIntColumnVector.java new file mode 100644 index 0000000000000..5673d9f1abf0b --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowIntColumnVector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.vectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.IntVector; + +/** + * Arrow column vector for Int. + */ +@Internal +public final class ArrowIntColumnVector implements IntColumnVector { + + private final IntVector intVector; + + public ArrowIntColumnVector(IntVector intVector) { + this.intVector = Preconditions.checkNotNull(intVector); + } + + @Override + public int getInt(int i) { + return intVector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return intVector.isNull(i); + } + + @Override + public void reset() { + intVector.reset(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowSmallIntColumnVector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowSmallIntColumnVector.java new file mode 100644 index 0000000000000..83bcdc119a5dd --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowSmallIntColumnVector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.vectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.SmallIntVector; + +/** + * Arrow column vector for Int. + */ +@Internal +public final class ArrowSmallIntColumnVector implements ShortColumnVector { + + private final SmallIntVector smallIntVector; + + public ArrowSmallIntColumnVector(SmallIntVector smallIntVector) { + this.smallIntVector = Preconditions.checkNotNull(smallIntVector); + } + + @Override + public short getShort(int i) { + return smallIntVector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return smallIntVector.isNull(i); + } + + @Override + public void reset() { + smallIntVector.reset(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowTinyIntColumnVector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowTinyIntColumnVector.java new file mode 100644 index 0000000000000..11225968ca0d3 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowTinyIntColumnVector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.vectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.TinyIntVector; + +/** + * Arrow column vector for TinyInt. + */ +@Internal +public final class ArrowTinyIntColumnVector implements ByteColumnVector { + + private final TinyIntVector tinyIntVector; + + public ArrowTinyIntColumnVector(TinyIntVector tinyIntVector) { + this.tinyIntVector = Preconditions.checkNotNull(tinyIntVector); + } + + @Override + public byte getByte(int i) { + return tinyIntVector.get(i); + } + + @Override + public boolean isNullAt(int i) { + return tinyIntVector.isNull(i); + } + + @Override + public void reset() { + tinyIntVector.reset(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/BaseRowArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/BaseRowArrowReader.java new file mode 100644 index 0000000000000..67dd06b9486a8 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/BaseRowArrowReader.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.vectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.ColumnarRow; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; +import org.apache.flink.table.runtime.arrow.ArrowReader; +import org.apache.flink.util.Preconditions; + +/** + * {@link ArrowReader} which read the underlying Arrow format data as {@link BaseRow}. + */ +@Internal +public final class BaseRowArrowReader implements ArrowReader { + + /** + * An array of vectors which are responsible for the deserialization of each column of the rows. + */ + private final ColumnVector[] columnVectors; + + /** + * Reusable row used to hold the deserialized result. + */ + private ColumnarRow reuseRow; + + public BaseRowArrowReader(ColumnVector[] columnVectors) { + this.columnVectors = Preconditions.checkNotNull(columnVectors); + this.reuseRow = new ColumnarRow(); + } + + /** + * Gets the column vectors. + */ + public ColumnVector[] getColumnVectors() { + return columnVectors; + } + + @Override + public BaseRow read(int rowId) { + reuseRow.setVectorizedColumnBatch(new VectorizedColumnBatch(columnVectors)); + reuseRow.setRowId(rowId); + return reuseRow; + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/ArrowFieldWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/ArrowFieldWriter.java new file mode 100644 index 0000000000000..f51bbb9364651 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/ArrowFieldWriter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.ValueVector; + +/** + * Base class for arrow field writer which is used to convert a field to an Arrow format. + * + * @param Type of the row to write. + */ +@Internal +public abstract class ArrowFieldWriter { + + /** + * Container which is used to store the written sequence of values of a column. + */ + private final ValueVector valueVector; + + /** + * The current count of elements written. + */ + private int count = 0; + + public ArrowFieldWriter(ValueVector valueVector) { + this.valueVector = Preconditions.checkNotNull(valueVector); + } + + /** + * Returns the underlying container which stores the sequence of values of a column. + */ + public ValueVector getValueVector() { + return valueVector; + } + + /** + * Returns the current count of elements written. + */ + public int getCount() { + return count; + } + + /** + * Sets the field value as the field at the specified ordinal of the specified row. + */ + public abstract void doWrite(IN row, int ordinal); + + /** + * Writes the specified ordinal of the specified row. + */ + public void write(IN row, int ordinal) { + doWrite(row, ordinal); + count += 1; + } + + /** + * Finishes the writing of the current row batch. + */ + public void finish() { + valueVector.setValueCount(count); + } + + /** + * Resets the state of the writer to write the next batch of fields. + */ + public void reset() { + valueVector.reset(); + count = 0; + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowBigIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowBigIntWriter.java new file mode 100644 index 0000000000000..55bf5e9ce520d --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowBigIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; + +import org.apache.arrow.vector.BigIntVector; + +/** + * {@link ArrowFieldWriter} for BigInt. + */ +@Internal +public final class BaseRowBigIntWriter extends ArrowFieldWriter { + + public BaseRowBigIntWriter(BigIntVector bigIntVector) { + super(bigIntVector); + } + + @Override + public void doWrite(BaseRow row, int ordinal) { + if (row.isNullAt(ordinal)) { + ((BigIntVector) getValueVector()).setNull(getCount()); + } else { + ((BigIntVector) getValueVector()).setSafe(getCount(), row.getLong(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowIntWriter.java new file mode 100644 index 0000000000000..c01f0105c17d4 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; + +import org.apache.arrow.vector.IntVector; + +/** + * {@link ArrowFieldWriter} for Int. + */ +@Internal +public final class BaseRowIntWriter extends ArrowFieldWriter { + + public BaseRowIntWriter(IntVector intVector) { + super(intVector); + } + + @Override + public void doWrite(BaseRow row, int ordinal) { + if (row.isNullAt(ordinal)) { + ((IntVector) getValueVector()).setNull(getCount()); + } else { + ((IntVector) getValueVector()).setSafe(getCount(), row.getInt(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowSmallIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowSmallIntWriter.java new file mode 100644 index 0000000000000..9a6a8686e66b2 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowSmallIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; + +import org.apache.arrow.vector.SmallIntVector; + +/** + * {@link ArrowFieldWriter} for SmallInt. + */ +@Internal +public final class BaseRowSmallIntWriter extends ArrowFieldWriter { + + public BaseRowSmallIntWriter(SmallIntVector smallIntVector) { + super(smallIntVector); + } + + @Override + public void doWrite(BaseRow row, int ordinal) { + if (row.isNullAt(ordinal)) { + ((SmallIntVector) getValueVector()).setNull(getCount()); + } else { + ((SmallIntVector) getValueVector()).setSafe(getCount(), row.getShort(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowTinyIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowTinyIntWriter.java new file mode 100644 index 0000000000000..c71bd8e4fefa7 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BaseRowTinyIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; + +import org.apache.arrow.vector.TinyIntVector; + +/** + * {@link ArrowFieldWriter} for TinyInt. + */ +@Internal +public final class BaseRowTinyIntWriter extends ArrowFieldWriter { + + public BaseRowTinyIntWriter(TinyIntVector tinyIntVector) { + super(tinyIntVector); + } + + @Override + public void doWrite(BaseRow row, int ordinal) { + if (row.isNullAt(ordinal)) { + ((TinyIntVector) getValueVector()).setNull(getCount()); + } else { + ((TinyIntVector) getValueVector()).setSafe(getCount(), row.getByte(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BigIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BigIntWriter.java new file mode 100644 index 0000000000000..398c74b0035a8 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/BigIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.types.Row; + +import org.apache.arrow.vector.BigIntVector; + +/** + * {@link ArrowFieldWriter} for BigInt. + */ +@Internal +public final class BigIntWriter extends ArrowFieldWriter { + + public BigIntWriter(BigIntVector bigIntVector) { + super(bigIntVector); + } + + @Override + public void doWrite(Row value, int ordinal) { + if (value.getField(ordinal) == null) { + ((BigIntVector) getValueVector()).setNull(getCount()); + } else { + ((BigIntVector) getValueVector()).setSafe(getCount(), (long) value.getField(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/IntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/IntWriter.java new file mode 100644 index 0000000000000..a6ede7df7aa5c --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/IntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.types.Row; + +import org.apache.arrow.vector.IntVector; + +/** + * {@link ArrowFieldWriter} for Int. + */ +@Internal +public final class IntWriter extends ArrowFieldWriter { + + public IntWriter(IntVector intVector) { + super(intVector); + } + + @Override + public void doWrite(Row value, int ordinal) { + if (value.getField(ordinal) == null) { + ((IntVector) getValueVector()).setNull(getCount()); + } else { + ((IntVector) getValueVector()).setSafe(getCount(), (int) value.getField(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/SmallIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/SmallIntWriter.java new file mode 100644 index 0000000000000..ae38e1594cbd8 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/SmallIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.types.Row; + +import org.apache.arrow.vector.SmallIntVector; + +/** + * {@link ArrowFieldWriter} for SmallInt. + */ +@Internal +public final class SmallIntWriter extends ArrowFieldWriter { + + public SmallIntWriter(SmallIntVector smallIntVector) { + super(smallIntVector); + } + + @Override + public void doWrite(Row value, int ordinal) { + if (value.getField(ordinal) == null) { + ((SmallIntVector) getValueVector()).setNull(getCount()); + } else { + ((SmallIntVector) getValueVector()).setSafe(getCount(), (short) value.getField(ordinal)); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/TinyIntWriter.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/TinyIntWriter.java new file mode 100644 index 0000000000000..7e4bb216a8063 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/writers/TinyIntWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.writers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.types.Row; + +import org.apache.arrow.vector.TinyIntVector; + +/** + * {@link ArrowFieldWriter} for TinyInt. + */ +@Internal +public final class TinyIntWriter extends ArrowFieldWriter { + + public TinyIntWriter(TinyIntVector tinyIntVector) { + super(tinyIntVector); + } + + @Override + public void doWrite(Row value, int ordinal) { + if (value.getField(ordinal) == null) { + ((TinyIntVector) getValueVector()).setNull(getCount()); + } else { + ((TinyIntVector) getValueVector()).setSafe(getCount(), (byte) value.getField(ordinal)); + } + } +} diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE index 248c9027df7ce..d32850b7d4c69 100644 --- a/flink-python/src/main/resources/META-INF/NOTICE +++ b/flink-python/src/main/resources/META-INF/NOTICE @@ -11,6 +11,7 @@ This project bundles the following dependencies under the Apache Software Licens - com.fasterxml.jackson.core:jackson-databind:2.10.1 - com.google.api.grpc:proto-google-common-protos:1.12.0 - com.google.code.gson:gson:2.7 +- com.google.flatbuffers:flatbuffers-java:1.9.0 - com.google.guava:guava:26.0-jre - io.grpc:grpc-auth:1.21.0 - io.grpc:grpc-core:1.21.0 @@ -19,11 +20,13 @@ This project bundles the following dependencies under the Apache Software Licens - io.grpc:grpc-protobuf:1.21.0 - io.grpc:grpc-stub:1.21.0 - io.grpc:grpc-testing:1.21.0 +- io.netty:netty-buffer:4.1.27.Final - io.netty:netty-buffer:4.1.34.Final - io.netty:netty-codec:4.1.34.Final - io.netty:netty-codec-http:4.1.34.Final - io.netty:netty-codec-http2:4.1.34.Final - io.netty:netty-codec-socks:4.1.34.Final +- io.netty:netty-common:4.1.27.Final - io.netty:netty-common:4.1.34.Final - io.netty:netty-handler:4.1.34.Final - io.netty:netty-handler-proxy:4.1.34.Final @@ -35,6 +38,9 @@ This project bundles the following dependencies under the Apache Software Licens - io.opencensus:opencensus-api:0.21.0 - io.opencensus:opencensus-contrib-grpc-metrics:0.21.0 - joda-time:joda-time:2.5 +- org.apache.arrow:arrow-format:0.16.0 +- org.apache.arrow:arrow-memory:0.16.0 +- org.apache.arrow:arrow-vector:0.16.0 - org.apache.beam:beam-model-fn-execution:2.19.0 - org.apache.beam:beam-model-job-management:2.19.0 - org.apache.beam:beam-model-pipeline:2.19.0 diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowReaderWriterTestBase.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowReaderWriterTestBase.java new file mode 100644 index 0000000000000..ec5e610ef488b --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowReaderWriterTestBase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.testutils.CustomEqualityMatcher; +import org.apache.flink.testutils.DeeplyEqualsChecker; +import org.apache.flink.util.Preconditions; + +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Abstract test base for {@link ArrowReader} and {@link ArrowWriter}. + * + * @param the elment type. + */ +public abstract class ArrowReaderWriterTestBase { + + private final DeeplyEqualsChecker checker; + + ArrowReaderWriterTestBase() { + this.checker = new DeeplyEqualsChecker(); + } + + ArrowReaderWriterTestBase(DeeplyEqualsChecker checker) { + this.checker = Preconditions.checkNotNull(checker); + } + + @Test + public void testBasicFunctionality() { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Tuple2, ArrowStreamWriter> tuple2 = createArrowWriter(baos); + ArrowWriter arrowWriter = tuple2.f0; + ArrowStreamWriter arrowStreamWriter = tuple2.f1; + + T[] testData = getTestData(); + for (T value : testData) { + arrowWriter.write(value); + } + arrowWriter.finish(); + arrowStreamWriter.writeBatch(); + + ArrowReader arrowReader = createArrowReader(new ByteArrayInputStream(baos.toByteArray())); + for (int i = 0; i < testData.length; i++) { + T deserialized = arrowReader.read(i); + assertThat("Deserialized value is wrong.", + deserialized, CustomEqualityMatcher.deeplyEquals(testData[i]).withChecker(checker)); + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Exception in test: " + e.getMessage()); + } + } + + public abstract ArrowReader createArrowReader(InputStream inputStream) throws IOException; + + public abstract Tuple2, ArrowStreamWriter> createArrowWriter(OutputStream outputStream) throws IOException; + + public abstract T[] getTestData(); +} diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java new file mode 100644 index 0000000000000..b738e68b64b1d --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader; +import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.IntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.RowArrowReader; +import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader; +import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.BaseRowArrowReader; +import org.apache.flink.table.runtime.arrow.writers.ArrowFieldWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.IntWriter; +import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.types.Row; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ArrowUtils}. + */ +public class ArrowUtilsTest { + + private static List, Class, Class, Class>> testFields; + private static RowType rowType; + private static BufferAllocator allocator; + + @BeforeClass + public static void init() { + testFields = new ArrayList<>(); + testFields.add(Tuple7.of( + "f1", new TinyIntType(), new ArrowType.Int(8, true), TinyIntWriter.class, + BaseRowTinyIntWriter.class, TinyIntFieldReader.class, ArrowTinyIntColumnVector.class)); + testFields.add(Tuple7.of("f2", new SmallIntType(), new ArrowType.Int(8 * 2, true), + SmallIntWriter.class, BaseRowSmallIntWriter.class, SmallIntFieldReader.class, ArrowSmallIntColumnVector.class)); + testFields.add(Tuple7.of("f3", new IntType(), new ArrowType.Int(8 * 4, true), + IntWriter.class, BaseRowIntWriter.class, IntFieldReader.class, ArrowIntColumnVector.class)); + testFields.add(Tuple7.of("f4", new BigIntType(), new ArrowType.Int(8 * 8, true), + BigIntWriter.class, BaseRowBigIntWriter.class, BigIntFieldReader.class, ArrowBigIntColumnVector.class)); + + List rowFields = new ArrayList<>(); + for (Tuple7, Class, Class, Class> field : testFields) { + rowFields.add(new RowType.RowField(field.f0, field.f1)); + } + rowType = new RowType(rowFields); + + allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator( + "stdout", 0, Long.MAX_VALUE); + } + + @Test + public void testConvertBetweenLogicalTypeAndArrowType() { + Schema schema = ArrowUtils.toArrowSchema(rowType); + + assertEquals(testFields.size(), schema.getFields().size()); + List fields = schema.getFields(); + for (int i = 0; i < schema.getFields().size(); i++) { + // verify convert from RowType to ArrowType + assertEquals(testFields.get(i).f0, fields.get(i).getName()); + assertEquals(testFields.get(i).f2, fields.get(i).getType()); + // verify convert from ArrowType to LogicalType + assertEquals(testFields.get(i).f1, ArrowUtils.fromArrowFieldToLogicalType(fields.get(i))); + } + } + + @Test + public void testCreateRowArrowReader() { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + RowArrowReader reader = ArrowUtils.createRowArrowReader(root); + ArrowFieldReader[] fieldReaders = reader.getFieldReaders(); + for (int i = 0; i < fieldReaders.length; i++) { + assertEquals(testFields.get(i).f5, fieldReaders[i].getClass()); + } + } + + @Test + public void testCreateBaseRowArrowReader() { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + BaseRowArrowReader reader = ArrowUtils.createBaseRowArrowReader(root); + ColumnVector[] columnVectors = reader.getColumnVectors(); + for (int i = 0; i < columnVectors.length; i++) { + assertEquals(testFields.get(i).f6, columnVectors[i].getClass()); + } + } + + @Test + public void testCreateRowArrowWriter() { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + ArrowWriter writer = ArrowUtils.createRowArrowWriter(root); + ArrowFieldWriter[] fieldWriters = writer.getFieldWriters(); + for (int i = 0; i < fieldWriters.length; i++) { + assertEquals(testFields.get(i).f3, fieldWriters[i].getClass()); + } + } + + @Test + public void testCreateBaseRowArrowWriter() { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + ArrowWriter writer = ArrowUtils.createBaseRowArrowWriter(root); + ArrowFieldWriter[] fieldWriters = writer.getFieldWriters(); + for (int i = 0; i < fieldWriters.length; i++) { + assertEquals(testFields.get(i).f4, fieldWriters[i].getClass()); + } + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReaderWriterTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReaderWriterTest.java new file mode 100644 index 0000000000000..33bbaabfba27e --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReaderWriterTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.typeutils.BaseRowSerializer; +import org.apache.flink.table.runtime.util.StreamRecordUtils; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.testutils.DeeplyEqualsChecker; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Tests for {@link ArrowReader} and {@link ArrowWriter} of BaseRow. + */ +public class BaseRowArrowReaderWriterTest extends ArrowReaderWriterTestBase { + private static List fieldTypes = new ArrayList<>(); + private static RowType rowType; + private static BufferAllocator allocator; + + public BaseRowArrowReaderWriterTest() { + super(new DeeplyEqualsChecker() + .withCustomCheck( + (o1, o2) -> o1 instanceof BaseRow && o2 instanceof BaseRow, + (o1, o2, checker) -> { + BaseRowSerializer serializer = new BaseRowSerializer( + new ExecutionConfig(), fieldTypes.toArray(new LogicalType[0])); + return deepEqualsBaseRow( + (BaseRow) o1, + (BaseRow) o2, + (BaseRowSerializer) serializer.duplicate(), + (BaseRowSerializer) serializer.duplicate()); + })); + } + + private static boolean deepEqualsBaseRow( + BaseRow should, BaseRow is, + BaseRowSerializer serializer1, BaseRowSerializer serializer2) { + if (should.getArity() != is.getArity()) { + return false; + } + BinaryRow row1 = serializer1.toBinaryRow(should); + BinaryRow row2 = serializer2.toBinaryRow(is); + + return Objects.equals(row1, row2); + } + + @BeforeClass + public static void init() { + fieldTypes.add(new TinyIntType()); + fieldTypes.add(new SmallIntType()); + fieldTypes.add(new IntType()); + fieldTypes.add(new BigIntType()); + + List rowFields = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i))); + } + rowType = new RowType(rowFields); + allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator( + "stdout", 0, Long.MAX_VALUE); + } + + @Override + public ArrowReader createArrowReader(InputStream inputStream) throws IOException { + ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator); + reader.loadNextBatch(); + return ArrowUtils.createBaseRowArrowReader(reader.getVectorSchemaRoot()); + } + + @Override + public Tuple2, ArrowStreamWriter> createArrowWriter(OutputStream outputStream) throws IOException { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + ArrowWriter arrowWriter = ArrowUtils.createBaseRowArrowWriter(root); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, outputStream); + arrowStreamWriter.start(); + return Tuple2.of(arrowWriter, arrowStreamWriter); + } + + @Override + public BaseRow[] getTestData() { + BaseRow row1 = StreamRecordUtils.baserow((byte) 1, (short) 2, 3, 4L); + BinaryRow row2 = StreamRecordUtils.binaryrow((byte) 1, (short) 2, 3, 4L); + BaseRow row3 = StreamRecordUtils.baserow(null, (short) 2, 3, 4L); + BinaryRow row4 = StreamRecordUtils.binaryrow((byte) 1, null, 3, 4L); + BaseRow row5 = StreamRecordUtils.baserow(null, null, null, null); + BinaryRow row6 = StreamRecordUtils.binaryrow(null, null, null, null); + return new BaseRow[]{row1, row2, row3, row4, row5, row6}; + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java new file mode 100644 index 0000000000000..1c4a18186bcc3 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.types.Row; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link ArrowReader} and {@link ArrowWriter} of Row. + */ +public class RowArrowReaderWriterTest extends ArrowReaderWriterTestBase { + private static RowType rowType; + private static BufferAllocator allocator; + + @BeforeClass + public static void init() { + List fieldTypes = new ArrayList<>(); + fieldTypes.add(new TinyIntType()); + fieldTypes.add(new SmallIntType()); + fieldTypes.add(new IntType()); + fieldTypes.add(new BigIntType()); + + List rowFields = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i))); + } + rowType = new RowType(rowFields); + allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator( + "stdout", 0, Long.MAX_VALUE); + } + + @Override + public ArrowReader createArrowReader(InputStream inputStream) throws IOException { + ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator); + reader.loadNextBatch(); + return ArrowUtils.createRowArrowReader(reader.getVectorSchemaRoot()); + } + + @Override + public Tuple2, ArrowStreamWriter> createArrowWriter(OutputStream outputStream) throws IOException { + VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + ArrowWriter arrowWriter = ArrowUtils.createRowArrowWriter(root); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, outputStream); + arrowStreamWriter.start(); + return Tuple2.of(arrowWriter, arrowStreamWriter); + } + + @Override + public Row[] getTestData() { + Row row1 = Row.of((byte) 1, (short) 2, 3, 4L); + Row row2 = Row.of((byte) 1, (short) 2, 3, 4L); + Row row3 = Row.of(null, (short) 2, 3, 4L); + Row row4 = Row.of((byte) 1, null, 3, 4L); + Row row5 = Row.of(null, null, null, null); + Row row6 = Row.of(null, null, null, null); + return new Row[]{row1, row2, row3, row4, row5, row6}; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java index 2acbcf0534e70..7e9c455ca39bc 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java @@ -99,6 +99,10 @@ public static BinaryRow binaryrow(Object... fields) { Object value = fields[j]; if (value == null) { writer.setNullAt(j); + } else if (value instanceof Byte) { + writer.writeByte(j, (Byte) value); + } else if (value instanceof Short) { + writer.writeShort(j, (Short) value); } else if (value instanceof Integer) { writer.writeInt(j, (Integer) value); } else if (value instanceof String) { diff --git a/pom.xml b/pom.xml index d45c6fb686009..63df747cd7ca0 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,7 @@ under the License. 0.10.8.1 2.19.0 3.7.1 + 0.16.0 false validate