Skip to content

Commit

Permalink
[FLINK-18296][json] Add support for TIMESTAMP_WITH_LOCAL_ZONE type fo…
Browse files Browse the repository at this point in the history
…r Json format

This closes apache#12756
  • Loading branch information
fsk119 authored and wuchong committed Jul 22, 2020
1 parent fe0d001 commit be9be0c
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 18 deletions.
13 changes: 9 additions & 4 deletions docs/dev/table/connectors/formats/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ Format Options
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<td>Specify the input and output timestamp format for <code>TIMESTAMP</code> and <code>TIMESTAMP WITH LOCAL TIME ZONE</code> type. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'SQL'</code> will parse input TIMESTAMP values in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123",
parse input TIMESTAMP WITH LOCAL TIME ZONE values in "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input TIMESTAMP in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g "2020-12-30T12:13:14.123"
parse input TIMESTAMP WITH LOCAL TIME ZONE in "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30T12:13:14.123Z" and output timestamp in the same format.</li>
</ul>
</td>
</tr>
Expand Down Expand Up @@ -187,6 +189,10 @@ The following table lists the type mapping from Flink type to JSON type.
<td><code>TIMESTAMP</code></td>
<td><code>string with format: date-time</code></td>
</tr>
<tr>
<td><code>TIMESTAMP_WITH_LOCAL_TIME_ZONE</code></td>
<td><code>string with format: date-time (with UTC time zone)</code></td>
</tr>
<tr>
<td><code>INTERVAL</code></td>
<td><code>number</code></td>
Expand All @@ -209,4 +215,3 @@ The following table lists the type mapping from Flink type to JSON type.




12 changes: 9 additions & 3 deletions docs/dev/table/connectors/formats/json.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ Format 参数
<td>可选</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>声明输入和输出的时间戳格式。当前支持的格式为<code>'SQL'</code> 以及 <code>'ISO-8601'</code>:
<td>声明输入和输出的 <code>TIMESTAMP</code> 和 <code>TIMESTAMP WITH LOCAL TIME ZONE</code> 的格式。当前支持的格式为<code>'SQL'</code> 以及 <code>'ISO-8601'</code>:
<ul>
<li>可选参数 <code>'SQL'</code> 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。</li>
<li>可选参数 <code>'ISO-8601'</code> 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。</li>
<li>可选参数 <code>'SQL'</code> 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123",
以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。</li>
<li>可选参数 <code>'ISO-8601'</code> 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" ,
以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。</li>
</ul>
</td>
</tr>
Expand Down Expand Up @@ -186,6 +188,10 @@ Format 参数
<td><code>TIMESTAMP</code></td>
<td><code>string with format: date-time</code></td>
</tr>
<tr>
<td><code>TIMESTAMP_WITH_LOCAL_TIME_ZONE</code></td>
<td><code>string with format: date-time (with UTC time zone)</code></td>
</tr>
<tr>
<td><code>INTERVAL</code></td>
<td><code>number</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.HashMap;
Expand All @@ -61,7 +62,9 @@
import static java.lang.String.format;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -202,6 +205,8 @@ private DeserializationRuntimeConverter createNotNullConverter(LogicalType type)
return this::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return this::convertToTimestampWithLocalZone;
case FLOAT:
return this::convertToFloat;
case DOUBLE:
Expand Down Expand Up @@ -303,6 +308,24 @@ private TimestampData convertToTimestamp(JsonNode jsonNode) {
return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
}

private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode){
TemporalAccessor parsedTimestampWithLocalZone;
switch (timestampFormat){
case SQL:
parsedTimestampWithLocalZone = SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
break;
case ISO_8601:
parsedTimestampWithLocalZone = ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
break;
default:
throw new TableException(String.format("Unsupported timestamp format '%s'. Validator should have checked that.", timestampFormat));
}
LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate());

return TimestampData.fromInstant(LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC));
}

private StringData convertToString(JsonNode jsonNode) {
if (jsonNode.isContainerNode()) {
return StringData.fromString(jsonNode.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Objects;

import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;

/**
Expand Down Expand Up @@ -170,6 +173,8 @@ private SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
return createTimeConverter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return createTimestampWithLocalZone();
case DECIMAL:
return createDecimalConverter();
case ARRAY:
Expand Down Expand Up @@ -227,6 +232,25 @@ private SerializationRuntimeConverter createTimestampConverter() {
}
}

private SerializationRuntimeConverter createTimestampWithLocalZone() {
switch (timestampFormat){
case ISO_8601:
return (mapper, reuse, value) -> {
TimestampData timestampWithLocalZone = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(timestampWithLocalZone.toInstant().atOffset(ZoneOffset.UTC)));
};
case SQL:
return (mapper, reuse, value) -> {
TimestampData timestampWithLocalZone = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.format(timestampWithLocalZone.toInstant().atOffset(ZoneOffset.UTC)));
};
default:
throw new TableException("Unsupported timestamp format. Validator should have checked that.");
}
}

private SerializationRuntimeConverter createArrayConverter(ArrayType type) {
final LogicalType elementType = type.getElementType();
final SerializationRuntimeConverter elementConverter = createConverter(elementType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class TimeFormats {
/** Formatter for ISO8601 string representation of a timestamp value (without UTC timezone). */
static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT = DateTimeFormatter.ISO_LOCAL_DATE_TIME;

/** Formatter for ISO8601 string representation of a timestamp value (with UTC timezone). */
static final DateTimeFormatter ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral('T')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.appendPattern("'Z'")
.toFormatter();

/** Formatter for SQL string representation of a time value. */
static final DateTimeFormatter SQL_TIME_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
Expand All @@ -57,6 +65,14 @@ class TimeFormats {
.append(SQL_TIME_FORMAT)
.toFormatter();

/** Formatter for SQL string representation of a timestamp value (with UTC timezone). */
static final DateTimeFormatter SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(SQL_TIME_FORMAT)
.appendPattern("'Z'")
.toFormatter();

private TimeFormats() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
*/
@Internal
public enum TimestampFormat {
/** Options to specify timestamp format. It will parse timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format
* and output timestamp in the same format*/
/** Options to specify TIMESTAMP/TIMESTAMP_WITH_LOCAL_ZONE format. It will parse TIMESTAMP in "yyyy-MM-dd HH:mm:ss.s{precision}" format,
* TIMESTAMP_WITH_LOCAL_TIMEZONE in "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" and output in the same format.*/
SQL,

/** Options to specify timestamp format. It will parse timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format
* and output timestamp in the same format*/
/** Options to specify TIMESTAMP/TIMESTAMP_WITH_LOCAL_ZONE format. It will pase TIMESTAMP in "yyyy-MM-ddTHH:mm:ss.s{precision}" format,
* TIMESTAMP_WITH_LOCAL_TIMEZONE in "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" and output in the same format.*/
ISO_8601
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -60,9 +63,11 @@
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

/**
* Tests for {@link JsonRowDataDeserializationSchema} and {@link JsonRowDataSerializationSchema}.
Expand All @@ -85,6 +90,9 @@ public void testSerDe() throws Exception {
LocalTime time = LocalTime.parse("12:12:43");
Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
Instant timestampWithLocalZone =
LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789).
atOffset(ZoneOffset.of("Z")).toInstant();

Map<String, Long> map = new HashMap<>();
map.put("flink", 123L);
Expand Down Expand Up @@ -113,6 +121,7 @@ public void testSerDe() throws Exception {
root.put("time", "12:12:43");
root.put("timestamp3", "1990-10-14T12:12:43.123");
root.put("timestamp9", "1990-10-14T12:12:43.123456789");
root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key", 234);

Expand All @@ -133,6 +142,7 @@ public void testSerDe() throws Exception {
FIELD("time", TIME(0)),
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp9", TIMESTAMP(9)),
FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
FIELD("map", MAP(STRING(), BIGINT())),
FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
RowType schema = (RowType) dataType.getLogicalType();
Expand All @@ -141,7 +151,7 @@ public void testSerDe() throws Exception {
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601);

Row expected = new Row(16);
Row expected = new Row(17);
expected.setField(0, true);
expected.setField(1, tinyint);
expected.setField(2, smallint);
Expand All @@ -156,8 +166,9 @@ public void testSerDe() throws Exception {
expected.setField(11, time);
expected.setField(12, timestamp3.toLocalDateTime());
expected.setField(13, timestamp9.toLocalDateTime());
expected.setField(14, map);
expected.setField(15, nestedMap);
expected.setField(14, timestampWithLocalZone);
expected.setField(15, map);
expected.setField(16, nestedMap);

RowData rowData = deserializationSchema.deserialize(serializedJson);
Row actual = convertToExternal(rowData, dataType);
Expand Down Expand Up @@ -324,7 +335,7 @@ public void testDeserializationMissingNode() throws Exception {
String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'.";
try {
deserializationSchema.deserialize(serializedJson);
Assert.fail("expecting exception message: " + errorMessage);
fail("expecting exception message: " + errorMessage);
} catch (Throwable t) {
assertEquals(errorMessage, t.getMessage());
}
Expand All @@ -350,7 +361,9 @@ public void testDeserializationMissingNode() throws Exception {
public void testSerDeSQLTimestampFormat() throws Exception{
RowType rowType = (RowType) ROW(
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp9", TIMESTAMP(9))
FIELD("timestamp9", TIMESTAMP(9)),
FIELD("timestamp_with_local_timezone3", TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
FIELD("timestamp_with_local_timezone9", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))
).getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
Expand All @@ -362,6 +375,8 @@ public void testSerDeSQLTimestampFormat() throws Exception{
ObjectNode root = objectMapper.createObjectNode();
root.put("timestamp3", "1990-10-14 12:12:43.123");
root.put("timestamp9", "1990-10-14 12:12:43.123456789");
root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
byte[] serializedJson = objectMapper.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
Expand Down Expand Up @@ -404,7 +419,7 @@ private void testParseErrors(TestSpec spec) throws Exception {

try {
failingSchema.deserialize(spec.json.getBytes());
Assert.fail("expecting exception " + spec.errorMessage);
fail("expecting exception " + spec.errorMessage);
} catch (Throwable t) {
assertEquals(t.getMessage(), spec.errorMessage);
}
Expand Down Expand Up @@ -538,7 +553,17 @@ private void testParseErrors(TestSpec spec) throws Exception {
.json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
.rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
.expect(Row.of(createHashMap("key1", 123, "key2", null)))
.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'.")
.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'."),

TestSpec
.json("{\"id\":\"2019-11-12T18:00:12\"}")
.rowType(ROW(FIELD("id", TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'."),

TestSpec
.json("{\"id\":\"2019-11-12T18:00:12+0800\"}")
.rowType(ROW(FIELD("id", TIMESTAMP_WITH_LOCAL_TIME_ZONE(0))))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12+0800\"}'.")
);

private static Map<String, Integer> createHashMap(String k1, Integer v1, String k2, Integer v2) {
Expand Down

0 comments on commit be9be0c

Please sign in to comment.