Skip to content

Commit

Permalink
[FLINK-21802][table-planner-blink] Fix TimestampType/RowType/MapType…
Browse files Browse the repository at this point in the history
…/ArrayType/MultisetType json ser/de

This closes apache#15219
  • Loading branch information
zjuwangg committed Mar 18, 2021
1 parent 0297fb9 commit 1aec553
Show file tree
Hide file tree
Showing 33 changed files with 2,859 additions and 379 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.SymbolType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TypeInformationRawType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.utils.EncodingUtils;

Expand All @@ -42,24 +50,31 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_COMPARISION;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_ELEMENT_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_FIELDS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_FINAL;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IDENTIFIER;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IMPLEMENTATION_CLASS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_INSTANTIABLE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_KEY_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_LENGTH;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_LOGICAL_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_NAME;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_NULLABLE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_PRECISION;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SOURCE_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SUPPER_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SYMBOL_CLASS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_TIMESTAMP_KIND;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_TYPE_INFO;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_TYPE_NAME;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_VALUE_TYPE;

/**
* JSON deserializer for {@link LogicalType}. refer to {@link LogicalTypeJsonSerializer} for
Expand Down Expand Up @@ -101,6 +116,20 @@ public LogicalType deserialize(JsonNode logicalTypeNode, SerdeContext serdeCtx)
return deserializeDistinctType(logicalTypeNode, serdeCtx);
case STRUCTURED_TYPE:
return deserializeStructuredType(logicalTypeNode, serdeCtx);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return deserializeTimestampType(logicalTypeNode);
case TIMESTAMP_WITH_TIME_ZONE:
return deserializeZonedTimestampType(logicalTypeNode);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return deserializeLocalZonedTimestampType(logicalTypeNode);
case ROW:
return deserializeRowType(logicalTypeNode, serdeCtx);
case MAP:
return deserializeMapType(logicalTypeNode, serdeCtx);
case ARRAY:
return deserializeArrayType(logicalTypeNode, serdeCtx);
case MULTISET:
return deserializeMultisetType(logicalTypeNode, serdeCtx);
default:
throw new TableException("Unsupported type name:" + typeName);
}
Expand All @@ -113,6 +142,48 @@ public LogicalType deserialize(JsonNode logicalTypeNode, SerdeContext serdeCtx)
}
}

private RowType deserializeRowType(JsonNode logicalTypeNode, SerdeContext serdeCtx) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
List<RowType.RowField> rowFields = new ArrayList<>();
Iterator<JsonNode> elements = logicalTypeNode.get(FIELD_NAME_FIELDS).elements();
while (elements.hasNext()) {
JsonNode node = elements.next();
String filedName = node.fieldNames().next();
LogicalType fieldType = deserialize(node.get(filedName), serdeCtx);
if (node.has(FIELD_NAME_DESCRIPTION)) {
rowFields.add(
new RowType.RowField(
filedName, fieldType, node.get(FIELD_NAME_DESCRIPTION).asText()));
} else {
rowFields.add(new RowType.RowField(filedName, fieldType));
}
}
return new RowType(nullable, rowFields);
}

private MapType deserializeMapType(JsonNode logicalTypeNode, SerdeContext serdeCtx) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
LogicalType keyLogicalType =
deserialize(logicalTypeNode.get(FIELD_NAME_KEY_TYPE), serdeCtx);
LogicalType valueLogicalType =
deserialize(logicalTypeNode.get(FIELD_NAME_VALUE_TYPE), serdeCtx);
return new MapType(nullable, keyLogicalType, valueLogicalType);
}

private ArrayType deserializeArrayType(JsonNode logicalTypeNode, SerdeContext serdeCtx) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
LogicalType elementType =
deserialize(logicalTypeNode.get(FIELD_NAME_ELEMENT_TYPE), serdeCtx);
return new ArrayType(nullable, elementType);
}

private MultisetType deserializeMultisetType(JsonNode logicalTypeNode, SerdeContext serdeCtx) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
LogicalType elementType =
deserialize(logicalTypeNode.get(FIELD_NAME_ELEMENT_TYPE), serdeCtx);
return new MultisetType(nullable, elementType);
}

private CharType deserializeCharType(JsonNode logicalTypeNode) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
int length = logicalTypeNode.get(FIELD_NAME_LENGTH).asInt();
Expand Down Expand Up @@ -260,4 +331,28 @@ private DistinctType deserializeDistinctType(JsonNode logicalTypeNode, SerdeCont
}
return builder.build();
}

private TimestampType deserializeTimestampType(JsonNode logicalTypeNode) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
int precision = logicalTypeNode.get(FIELD_NAME_PRECISION).asInt();
TimestampKind timestampKind =
TimestampKind.valueOf(logicalTypeNode.get(FIELD_NAME_TIMESTAMP_KIND).asText());
return new TimestampType(nullable, timestampKind, precision);
}

private ZonedTimestampType deserializeZonedTimestampType(JsonNode logicalTypeNode) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
int precision = logicalTypeNode.get(FIELD_NAME_PRECISION).asInt();
TimestampKind timestampKind =
TimestampKind.valueOf(logicalTypeNode.get(FIELD_NAME_TIMESTAMP_KIND).asText());
return new ZonedTimestampType(nullable, timestampKind, precision);
}

private LocalZonedTimestampType deserializeLocalZonedTimestampType(JsonNode logicalTypeNode) {
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
int precision = logicalTypeNode.get(FIELD_NAME_PRECISION).asInt();
TimestampKind timestampKind =
TimestampKind.valueOf(logicalTypeNode.get(FIELD_NAME_TIMESTAMP_KIND).asText());
return new LocalZonedTimestampType(nullable, timestampKind, precision);
}
}
Loading

0 comments on commit 1aec553

Please sign in to comment.