Skip to content

Commit

Permalink
Merge pull request apache#10356: [BEAM-7274] Infer a Beam Schema from…
Browse files Browse the repository at this point in the history
… a protocol buffer class.
  • Loading branch information
reuvenlax authored and Jozef Vilcek committed Feb 21, 2020
1 parent 699d232 commit 1d84715
Show file tree
Hide file tree
Showing 14 changed files with 1,296 additions and 38 deletions.
52 changes: 52 additions & 0 deletions model/pipeline/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,56 @@ message LogicalType {
string urn = 1;
bytes payload = 2;
FieldType representation = 3;
FieldType argument_type = 4;
FieldValue argument = 5;
}

message Row {
repeated FieldValue values = 1;
}

message FieldValue {
oneof field_value {
AtomicTypeValue atomic_value = 1;
ArrayTypeValue array_value = 2;
IterableTypeValue iterable_value = 3;
MapTypeValue map_value = 4;
Row row_value = 5;
LogicalTypeValue logical_type_value = 6;
}
}

message AtomicTypeValue {
oneof value {
int32 byte = 1;
int32 int16 = 2;
int32 int32 = 3;
int64 int64 = 4;
float float = 5;
double double = 6;
string string = 7;
bool boolean = 8;
bytes bytes = 9;
}
}

message ArrayTypeValue {
repeated FieldValue element = 1;
}

message IterableTypeValue {
repeated FieldValue element = 1;
}

message MapTypeValue {
repeated MapTypeEntry entries = 1;
}

message MapTypeEntry {
FieldValue key = 1;
FieldValue value = 2;
}

message LogicalTypeValue {
FieldValue value = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public List<? extends Coder<?>> getComponents(RowCoder from) {

@Override
public byte[] getPayload(RowCoder from) {
return SchemaTranslation.schemaToProto(from.getSchema()).toByteArray();
return SchemaTranslation.schemaToProto(from.getSchema(), true).toByteArray();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static Iterable<Schema> data() {

@Test
public void toAndFromProto() throws Exception {
SchemaApi.Schema schemaProto = SchemaTranslation.schemaToProto(schema);
SchemaApi.Schema schemaProto = SchemaTranslation.schemaToProto(schema, true);

Schema decodedSchema = SchemaTranslation.fromProto(schemaProto);
assertThat(decodedSchema, equalTo(schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public CloudObject toCloudObject(SchemaCoder target, SdkComponents sdkComponents
base,
SCHEMA,
StringUtils.byteArrayToJsonString(
SchemaTranslation.schemaToProto(target.getSchema()).toByteArray()));
SchemaTranslation.schemaToProto(target.getSchema(), true).toByteArray()));
return base;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,14 @@ public abstract static class FieldType implements Serializable {

abstract FieldType.Builder toBuilder();

public boolean isLogicalType(String logicalTypeIdentifier) {
return getTypeName().isLogicalType()
&& getLogicalType().getIdentifier().equals(logicalTypeIdentifier);
}

/** Helper function for retrieving the concrete logical type subclass. */
public <LogicalTypeT> LogicalTypeT getLogicalType(Class<LogicalTypeT> logicalTypeClass) {
public <LogicalTypeT extends LogicalType> LogicalTypeT getLogicalType(
Class<LogicalTypeT> logicalTypeClass) {
return logicalTypeClass.cast(getLogicalType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.model.pipeline.v1.SchemaApi.ArrayTypeValue;
import org.apache.beam.model.pipeline.v1.SchemaApi.FieldValue;
import org.apache.beam.model.pipeline.v1.SchemaApi.IterableTypeValue;
import org.apache.beam.model.pipeline.v1.SchemaApi.MapTypeEntry;
import org.apache.beam.model.pipeline.v1.SchemaApi.MapTypeValue;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

/** Utility methods for translating schemas. */
Expand All @@ -35,85 +42,99 @@ public class SchemaTranslation {
private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1";
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";

public static SchemaApi.Schema schemaToProto(Schema schema) {
public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder().setId(uuid);
for (Field field : schema.getFields()) {
SchemaApi.Field protoField =
fieldToProto(
field,
schema.indexOf(field.getName()),
schema.getEncodingPositions().get(field.getName()));
schema.getEncodingPositions().get(field.getName()),
serializeLogicalType);
builder.addFields(protoField);
}
return builder.build();
}

private static SchemaApi.Field fieldToProto(Field field, int fieldId, int position) {
private static SchemaApi.Field fieldToProto(
Field field, int fieldId, int position, boolean serializeLogicalType) {
return SchemaApi.Field.newBuilder()
.setName(field.getName())
.setDescription(field.getDescription())
.setType(fieldTypeToProto(field.getType()))
.setType(fieldTypeToProto(field.getType(), serializeLogicalType))
.setId(fieldId)
.setEncodingPosition(position)
.build();
}

private static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType) {
private static SchemaApi.FieldType fieldTypeToProto(
FieldType fieldType, boolean serializeLogicalType) {
SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder();
switch (fieldType.getTypeName()) {
case ROW:
builder.setRowType(
SchemaApi.RowType.newBuilder().setSchema(schemaToProto(fieldType.getRowSchema())));
SchemaApi.RowType.newBuilder()
.setSchema(schemaToProto(fieldType.getRowSchema(), serializeLogicalType)));
break;

case ARRAY:
builder.setArrayType(
SchemaApi.ArrayType.newBuilder()
.setElementType(fieldTypeToProto(fieldType.getCollectionElementType())));
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
break;

case ITERABLE:
builder.setIterableType(
SchemaApi.IterableType.newBuilder()
.setElementType(fieldTypeToProto(fieldType.getCollectionElementType())));
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
break;

case MAP:
builder.setMapType(
SchemaApi.MapType.newBuilder()
.setKeyType(fieldTypeToProto(fieldType.getMapKeyType()))
.setValueType(fieldTypeToProto(fieldType.getMapValueType()))
.setKeyType(fieldTypeToProto(fieldType.getMapKeyType(), serializeLogicalType))
.setValueType(fieldTypeToProto(fieldType.getMapValueType(), serializeLogicalType))
.build());
break;

case LOGICAL_TYPE:
LogicalType logicalType = fieldType.getLogicalType();
builder.setLogicalType(
SchemaApi.LogicalType.Builder logicalTypeBuilder =
SchemaApi.LogicalType.newBuilder()
.setArgumentType(
fieldTypeToProto(logicalType.getArgumentType(), serializeLogicalType))
.setArgument(
rowFieldToProto(logicalType.getArgumentType(), logicalType.getArgument()))
.setRepresentation(
fieldTypeToProto(logicalType.getBaseType(), serializeLogicalType))
// TODO(BEAM-7855): "javasdk" types should only be a last resort. Types defined in
// Beam should have their own URN, and there should be a mechanism for users to
// register their own types by URN.
.setUrn(URN_BEAM_LOGICAL_JAVASDK)
.setPayload(
ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType)))
.setRepresentation(fieldTypeToProto(logicalType.getBaseType()))
.build());
.setUrn(URN_BEAM_LOGICAL_JAVASDK);
if (serializeLogicalType) {
logicalTypeBuilder =
logicalTypeBuilder.setPayload(
ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType)));
}
builder.setLogicalType(logicalTypeBuilder.build());
break;
// Special-case for DATETIME and DECIMAL which are logical types in portable representation,
// but not yet in Java. (BEAM-7554)
case DATETIME:
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DATETIME)
.setRepresentation(fieldTypeToProto(FieldType.INT64))
.setRepresentation(fieldTypeToProto(FieldType.INT64, serializeLogicalType))
.build());
break;
case DECIMAL:
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DECIMAL)
.setRepresentation(fieldTypeToProto(FieldType.BYTES))
.setRepresentation(fieldTypeToProto(FieldType.BYTES, serializeLogicalType))
.build());
break;
case BYTE:
Expand Down Expand Up @@ -240,4 +261,94 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
"Unexpected type_info: " + protoFieldType.getTypeInfoCase());
}
}

public static SchemaApi.Row rowToProto(Row row) {
SchemaApi.Row.Builder builder = SchemaApi.Row.newBuilder();
for (int i = 0; i < row.getFieldCount(); ++i) {
builder.addValues(rowFieldToProto(row.getSchema().getField(i).getType(), row.getValue(i)));
}
return builder.build();
}

private static SchemaApi.FieldValue rowFieldToProto(FieldType fieldType, Object value) {
FieldValue.Builder builder = FieldValue.newBuilder();
switch (fieldType.getTypeName()) {
case ARRAY:
return builder
.setArrayValue(
arrayValueToProto(fieldType.getCollectionElementType(), (Iterable) value))
.build();
case ITERABLE:
return builder
.setIterableValue(
iterableValueToProto(fieldType.getCollectionElementType(), (Iterable) value))
.build();
case MAP:
return builder
.setMapValue(
mapToProto(fieldType.getMapKeyType(), fieldType.getMapValueType(), (Map) value))
.build();
case ROW:
return builder.setRowValue(rowToProto((Row) value)).build();
case LOGICAL_TYPE:
default:
return builder.setAtomicValue(primitiveRowFieldToProto(fieldType, value)).build();
}
}

private static SchemaApi.ArrayTypeValue arrayValueToProto(
FieldType elementType, Iterable values) {
return ArrayTypeValue.newBuilder()
.addAllElement(Iterables.transform(values, e -> rowFieldToProto(elementType, e)))
.build();
}

private static SchemaApi.IterableTypeValue iterableValueToProto(
FieldType elementType, Iterable values) {
return IterableTypeValue.newBuilder()
.addAllElement(Iterables.transform(values, e -> rowFieldToProto(elementType, e)))
.build();
}

private static SchemaApi.MapTypeValue mapToProto(
FieldType keyType, FieldType valueType, Map<Object, Object> map) {
MapTypeValue.Builder builder = MapTypeValue.newBuilder();
for (Map.Entry entry : map.entrySet()) {
MapTypeEntry mapProtoEntry =
MapTypeEntry.newBuilder()
.setKey(rowFieldToProto(keyType, entry.getKey()))
.setValue(rowFieldToProto(valueType, entry.getValue()))
.build();
builder.addEntries(mapProtoEntry);
}
return builder.build();
}

private static SchemaApi.AtomicTypeValue primitiveRowFieldToProto(
FieldType fieldType, Object value) {
switch (fieldType.getTypeName()) {
case BYTE:
return SchemaApi.AtomicTypeValue.newBuilder().setByte((int) value).build();
case INT16:
return SchemaApi.AtomicTypeValue.newBuilder().setInt16((int) value).build();
case INT32:
return SchemaApi.AtomicTypeValue.newBuilder().setInt32((int) value).build();
case INT64:
return SchemaApi.AtomicTypeValue.newBuilder().setInt64((long) value).build();
case FLOAT:
return SchemaApi.AtomicTypeValue.newBuilder().setFloat((float) value).build();
case DOUBLE:
return SchemaApi.AtomicTypeValue.newBuilder().setDouble((double) value).build();
case STRING:
return SchemaApi.AtomicTypeValue.newBuilder().setString((String) value).build();
case BOOLEAN:
return SchemaApi.AtomicTypeValue.newBuilder().setBoolean((boolean) value).build();
case BYTES:
return SchemaApi.AtomicTypeValue.newBuilder()
.setBytes(ByteString.copyFrom((byte[]) value))
.build();
default:
throw new RuntimeException("FieldType unexpected " + fieldType.getTypeName());
}
}
}
Loading

0 comments on commit 1d84715

Please sign in to comment.