Skip to content

Commit

Permalink
Merge pull request apache#10247: [BEAM-7274] In preparation for proto…
Browse files Browse the repository at this point in the history
…col-buffer schemas, add OneOf and Enumeration logical types
  • Loading branch information
reuvenlax authored and Jozef Vilcek committed Feb 21, 2020
1 parent 40d2e7c commit 63d4ba2
Show file tree
Hide file tree
Showing 34 changed files with 977 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class CoderTranslationTest {
Field.of("i16", FieldType.INT16),
Field.of("array", FieldType.array(FieldType.STRING)),
Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)),
Field.of("bar", FieldType.logicalType(LogicalTypes.FixedBytes.of(123))))))
Field.of("bar", FieldType.logicalType(FixedBytes.of(123))))))
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import static org.junit.Assert.assertThat;

import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -75,8 +76,7 @@ public static Iterable<Schema> data() {
.add(
Schema.of(
Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME)))
.add(
Schema.of(Field.of("logical", FieldType.logicalType(LogicalTypes.FixedBytes.of(24)))))
.add(Schema.of(Field.of("logical", FieldType.logicalType(FixedBytes.of(24)))))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.io.IOException;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.SchemaTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class CloudObjectsTest {
.addDoubleField("double")
.addStringField("string")
.addArrayField("list_int32", FieldType.INT32)
.addLogicalTypeField("fixed_bytes", LogicalTypes.FixedBytes.of(4))
.addLogicalTypeField("fixed_bytes", FixedBytes.of(4))
.build();

/** Tests that all of the Default Coders are tested. */
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -497,15 +497,18 @@ public interface LogicalType<InputT, BaseT> extends Serializable {
/** The unique identifier for this type. */
String getIdentifier();

/** A schema type representing how to interpret the argument. */
FieldType getArgumentType();

/** An optional argument to configure the type. */
default String getArgument() {
return "";
@SuppressWarnings("TypeParameterUnusedInFormals")
default <T> T getArgument() {
return null;
}

/** The base {@link FieldType} used to store values of this type. */
FieldType getBaseType();

/** Convert the input Java type to one appropriate for the base {@link FieldType}. */
BaseT toBaseType(InputT input);

/** Convert the Java type used by the base {@link FieldType} to the input type. */
Expand Down Expand Up @@ -665,11 +668,7 @@ public static final FieldType row(Schema schema) {
/** Creates a logical type based on a primitive field type. */
public static final <InputT, BaseT> FieldType logicalType(
LogicalType<InputT, BaseT> logicalType) {
return FieldType.forTypeName(TypeName.LOGICAL_TYPE)
.setLogicalType(logicalType)
.build()
.withMetadata(LOGICAL_TYPE_IDENTIFIER, logicalType.getIdentifier())
.withMetadata(LOGICAL_TYPE_ARGUMENT, logicalType.getArgument());
return FieldType.forTypeName(TypeName.LOGICAL_TYPE).setLogicalType(logicalType).build();
}

/** Set the metadata map for the type, overriding any existing metadata.. */
Expand Down Expand Up @@ -720,10 +719,26 @@ public boolean equals(Object o) {
if (!(o instanceof FieldType)) {
return false;
}
// Logical type not included here, since the logical type identifier is included in the
// metadata. The LogicalType class is cached in this object just for convenience.
// TODO: this is wrong, since LogicalTypes have metadata associated.

FieldType other = (FieldType) o;
if (getTypeName().isLogicalType()) {
if (!other.getTypeName().isLogicalType()) {
return false;
}
if (!Objects.equals(
getLogicalType().getIdentifier(), other.getLogicalType().getIdentifier())) {
return false;
}
if (!getLogicalType().getArgumentType().equals(other.getLogicalType().getArgumentType())) {
return false;
}
if (!Row.Equals.deepEquals(
getLogicalType().getArgument(),
other.getLogicalType().getArgument(),
getLogicalType().getArgumentType())) {
return false;
}
}
return Objects.equals(getTypeName(), other.getTypeName())
&& Objects.equals(getNullable(), other.getNullable())
&& Objects.equals(getCollectionElementType(), other.getCollectionElementType())
Expand All @@ -738,6 +753,24 @@ public boolean typesEqual(FieldType other) {
if (!Objects.equals(getTypeName(), other.getTypeName())) {
return false;
}
if (getTypeName().isLogicalType()) {
if (!other.getTypeName().isLogicalType()) {
return false;
}
if (!Objects.equals(
getLogicalType().getIdentifier(), other.getLogicalType().getIdentifier())) {
return false;
}
if (!getLogicalType().getArgumentType().equals(other.getLogicalType().getArgumentType())) {
return false;
}
if (!Row.Equals.deepEquals(
getLogicalType().getArgument(),
other.getLogicalType().getArgument(),
getLogicalType().getArgumentType())) {
return false;
}
}
if (!Objects.equals(getMetadata(), other.getMetadata())) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
package org.apache.beam.sdk.schemas;

import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
Expand Down
Loading

0 comments on commit 63d4ba2

Please sign in to comment.