Skip to content

Commit

Permalink
[FLINK-14546][formats] Support map type in JSON format
Browse files Browse the repository at this point in the history
This closes apache#10060
  • Loading branch information
libenchao authored and KurtYoung committed Nov 6, 2019
1 parent a970430 commit 2ea1416
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -47,7 +49,10 @@
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -242,11 +247,31 @@ private Optional<DeserializationRuntimeConverter> createContainerConverter(TypeI
return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(createByteArrayConverter());
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) typeInfo;
return Optional.of(createMapConverter(mapTypeInfo.getKeyTypeInfo(), mapTypeInfo.getValueTypeInfo()));
} else {
return Optional.empty();
}
}

private DeserializationRuntimeConverter createMapConverter(TypeInformation keyType, TypeInformation valueType) {
DeserializationRuntimeConverter valueConverter = createConverter(valueType);
DeserializationRuntimeConverter keyConverter = createConverter(keyType);

return (mapper, jsonNode) -> {
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
Map<Object, Object> result = new HashMap<>();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
Object key = keyConverter.convert(mapper, TextNode.valueOf(entry.getKey()));
Object value = valueConverter.convert(mapper, entry.getValue());
result.put(key, value);
}
return result;
};
}

private DeserializationRuntimeConverter createByteArrayConverter() {
return (mapper, jsonNode) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith;
Expand All @@ -53,6 +55,13 @@ public void testTypeInfoDeserialization() throws Exception {
String name = "asdlkjasjkdla998y1122";
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
Map<String, Long> map = new HashMap<>();
map.put("flink", 123L);

Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);

ObjectMapper objectMapper = new ObjectMapper();

Expand All @@ -61,19 +70,24 @@ public void testTypeInfoDeserialization() throws Exception {
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key", 234);

byte[] serializedJson = objectMapper.writeValueAsBytes(root);

JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(
Types.ROW_NAMED(
new String[]{"id", "name", "bytes"},
Types.LONG, Types.STRING, Types.PRIMITIVE_ARRAY(Types.BYTE))
new String[]{"id", "name", "bytes", "map", "map2map"},
Types.LONG, Types.STRING, Types.PRIMITIVE_ARRAY(Types.BYTE), Types.MAP(Types.STRING, Types.LONG),
Types.MAP(Types.STRING, Types.MAP(Types.STRING, Types.INT)))
).build();

Row row = new Row(3);
Row row = new Row(5);
row.setField(0, id);
row.setField(1, name);
row.setField(2, bytes);
row.setField(3, map);
row.setField(4, nestedMap);

assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row));
}
Expand Down

0 comments on commit 2ea1416

Please sign in to comment.