From 1ae5983bc2b267ed7725338ef932505477aee7b8 Mon Sep 17 00:00:00 2001 From: jerryjzhang Date: Sun, 19 Aug 2018 22:27:01 +0800 Subject: [PATCH] [FLINK-10170] [table] Add string representation for all Table & SQL API types Since 1.6 the recommended way of creating source/sink tables is using connector/format/schema descriptors. This commit adds string-based representation for all types supported by the Table & SQL API. We use a syntax similar to Hive and other SQL projects. This closes #6578. --- docs/dev/table/connect.md | 21 ++-- docs/dev/table/sqlClient.md | 2 +- .../test-scripts/test_sql_client.sh | 2 +- .../flink/table/descriptors/JsonTest.java | 2 +- .../table/typeutils/TypeStringUtils.scala | 98 ++++++++++++++----- .../flink/table/descriptors/CsvTest.scala | 4 +- .../descriptors/TableDescriptorTest.scala | 30 +++++- .../table/typeutils/TypeStringUtilsTest.scala | 42 ++++++-- 8 files changed, 152 insertions(+), 49 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 1bfff420d62d4..16649e52ff64a 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -417,13 +417,18 @@ DECIMAL DATE TIME TIMESTAMP -ROW(fieldtype, ...) # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo - # with indexed fields names f0, f1, ... -ROW(fieldname fieldtype, ...) # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that - # is mapped to Flink's RowTypeInfo -POJO(class) # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo -ANY(class) # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo -ANY(class, serialized) # used for type information that is not supported by Flink's Table & SQL API +MAP # generic map; e.g. MAP that is mapped to Flink's MapTypeInfo +MULTISET # multiset; e.g. MULTISET that is mapped to Flink's MultisetTypeInfo +PRIMITIVE_ARRAY # primitive array; e.g. PRIMITIVE_ARRAY that is mapped to Flink's PrimitiveArrayTypeInfo +OBJECT_ARRAY # object array; e.g. OBJECT_ARRAY that is mapped to + # Flink's ObjectArrayTypeInfo +ROW # unnamed row; e.g. ROW that is mapped to Flink's RowTypeInfo + # with indexed fields names f0, f1, ... +ROW # named row; e.g., ROW that + # is mapped to Flink's RowTypeInfo +POJO # e.g., POJO that is mapped to Flink's PojoTypeInfo +ANY # e.g., ANY that is mapped to Flink's GenericTypeInfo +ANY # used for type information that is not supported by Flink's Table & SQL API {% endhighlight %} {% top %} @@ -1046,4 +1051,4 @@ table.writeToSink(sink) -{% top %} \ No newline at end of file +{% top %} diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index 8c4ba83c6dc9a..296d638590622 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -302,7 +302,7 @@ tables: format: property-version: 1 type: json - schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)" + schema: "ROW" schema: - name: rideId type: LONG diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 934f7d43ed408..b5830725db128 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -128,7 +128,7 @@ tables: - name: user type: VARCHAR - name: event - type: ROW(type VARCHAR, message VARCHAR) + type: ROW connector: type: kafka version: "0.10" diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java index 6e370a02c13ed..ac6ff11c370b0 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java @@ -106,7 +106,7 @@ public List> properties() { final Map props3 = new HashMap<>(); props3.put("format.type", "json"); props3.put("format.property-version", "1"); - props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)"); + props3.put("format.schema", "ROW"); props3.put("format.fail-on-missing-field", "true"); final Map props4 = new HashMap<>(); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala index afc6506cbef43..9e5f075cd4b4a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala @@ -23,7 +23,7 @@ import java.io.Serializable import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.StringEscapeUtils import org.apache.flink.api.common.functions.InvalidTypesException -import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils._ import org.apache.flink.table.api.{TableException, Types, ValidationException} @@ -67,6 +67,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { lazy val ROW: Keyword = Keyword("ROW") lazy val ANY: Keyword = Keyword("ANY") lazy val POJO: Keyword = Keyword("POJO") + lazy val MAP: Keyword = Keyword("MAP") + lazy val MULTISET: Keyword = Keyword("MULTISET") + lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY") + lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY") lazy val qualifiedName: Parser[String] = """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r @@ -74,6 +78,13 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { lazy val base64Url: Parser[String] = """[A-Za-z0-9_-]*""".r + // keep parenthesis to ensure backward compatibility (this can be dropped after Flink 1.7) + lazy val leftBracket: PackratParser[(String)] = + "(" | "<" + + lazy val rightBracket: PackratParser[(String)] = + ")" | ">" + lazy val atomic: PackratParser[TypeInformation[_]] = (VARCHAR | STRING) ^^ { e => Types.STRING } | BOOLEAN ^^ { e => Types.BOOLEAN } | @@ -101,34 +112,35 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { } lazy val namedRow: PackratParser[TypeInformation[_]] = - ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ { + ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ { fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray) } | failure("Named row type expected.") lazy val unnamedRow: PackratParser[TypeInformation[_]] = - ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ { + ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ { types => Types.ROW(types: _*) } | failure("Unnamed row type expected.") lazy val generic: PackratParser[TypeInformation[_]] = - ANY ~ "(" ~> qualifiedName <~ ")" ^^ { + ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ { typeClass => val clazz = loadClass(typeClass) new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]]) } - lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ { - typeClass => - val clazz = loadClass(typeClass) - val info = TypeExtractor.createTypeInfo(clazz) - if (!info.isInstanceOf[PojoTypeInfo[_]]) { - throw new ValidationException(s"Class '$typeClass'is not a POJO type.") - } - info - } + lazy val pojo: PackratParser[TypeInformation[_]] = + POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ { + typeClass => + val clazz = loadClass(typeClass) + val info = TypeExtractor.createTypeInfo(clazz) + if (!info.isInstanceOf[PojoTypeInfo[_]]) { + throw new ValidationException(s"Class '$typeClass'is not a POJO type.") + } + info + } lazy val any: PackratParser[TypeInformation[_]] = - ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ { + ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ { case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=> val clazz = loadClass(typeClass) val typeInfo = deserialize(serialized) @@ -140,8 +152,38 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { typeInfo } + lazy val genericMap: PackratParser[TypeInformation[_]] = + MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=> + Types.MAP(keyTypeInfo, valueTypeInfo) + } + + lazy val multiSet: PackratParser[TypeInformation[_]] = + MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ elementTypeInfo ~ _ => + Types.MULTISET(elementTypeInfo) + } + + lazy val primitiveArray: PackratParser[TypeInformation[_]] = + PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ componentTypeInfo ~ _ => + Types.PRIMITIVE_ARRAY(componentTypeInfo) + } + + lazy val objectArray: PackratParser[TypeInformation[_]] = + OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ { + case _ ~ _ ~ componentTypeInfo ~ _ => + Types.OBJECT_ARRAY(componentTypeInfo) + } + + lazy val map: PackratParser[TypeInformation[_]] = + genericMap | multiSet + + lazy val array: PackratParser[TypeInformation[_]] = + primitiveArray | objectArray + lazy val typeInfo: PackratParser[TypeInformation[_]] = - namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.") + namedRow | unnamedRow | any | generic | pojo | atomic | map | array | failure("Invalid type.") def readTypeInfo(typeString: String): TypeInformation[_] = { parseAll(typeInfo, typeString) match { @@ -182,10 +224,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { s"$name ${normalizeTypeInfo(f._2)}" } - s"${ROW.key}(${normalizedFields.mkString(", ")})" + s"${ROW.key}<${normalizedFields.mkString(", ")}>" case generic: GenericTypeInfo[_] => - s"${ANY.key}(${generic.getTypeClass.getName})" + s"${ANY.key}<${generic.getTypeClass.getName}>" case pojo: PojoTypeInfo[_] => // we only support very simple POJOs that only contain extracted fields @@ -196,24 +238,28 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { case _: InvalidTypesException => None } extractedInfo match { - case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})" + case Some(ei) if ei == pojo => s"${POJO.key}<${pojo.getTypeClass.getName}>" case _ => throw new TableException( "A string representation for custom POJO types is not supported yet.") } - case _: CompositeType[_] => - throw new TableException("A string representation for composite types is not supported yet.") + case array: PrimitiveArrayTypeInfo[_] => + s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>" + + case array: ObjectArrayTypeInfo[_, _] => + s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>" - case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] | - _: PrimitiveArrayTypeInfo[_] => - throw new TableException("A string representation for array types is not supported yet.") + case set: MultisetTypeInfo[_] => + s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>" - case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] => - throw new TableException("A string representation for map types is not supported yet.") + case map: MapTypeInfo[_, _] => + val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo) + val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo) + s"${MAP.key}<$normalizedKey, $normalizedValue>" case any: TypeInformation[_] => - s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})" + s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>" } // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala index 0c2e8069474fa..8d01b8be3c3f0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala @@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase { "format.fields.1.name" -> "field2", "format.fields.1.type" -> "TIMESTAMP", "format.fields.2.name" -> "field3", - "format.fields.2.type" -> "ANY(java.lang.Class)", + "format.fields.2.type" -> "ANY", "format.fields.3.name" -> "field4", - "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", + "format.fields.3.type" -> "ROW", "format.line-delimiter" -> "^") val props2 = Map( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index ccac3170cbd3f..00e3a21c9c2a4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.descriptors +import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.Types +import org.apache.flink.table.runtime.utils.CommonTestData.Person import org.apache.flink.table.utils.TableTestBase import org.junit.Assert.assertEquals import org.junit.Test @@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase { val schema = Schema() .field("myfield", Types.STRING) .field("myfield2", Types.INT) + .field("myfield3", Types.MAP(Types.STRING, Types.INT)) + .field("myfield4", Types.MULTISET(Types.LONG)) + .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT)) + .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))) // CSV table source and sink do not support proctime yet //if (isStreaming) { // schema.field("proctime", Types.SQL_TIMESTAMP).proctime() @@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase { val format = Csv() .field("myfield", Types.STRING) .field("myfield2", Types.INT) + .field("myfield3", Types.MAP(Types.STRING, Types.INT)) + .field("myfield4", Types.MULTISET(Types.LONG)) + .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT)) + .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))) .fieldDelimiter("#") val descriptor: RegistrableDescriptor = if (isStreaming) { @@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase { "format.fields.0.type" -> "VARCHAR", "format.fields.1.name" -> "myfield2", "format.fields.1.type" -> "INT", + "format.fields.2.name" -> "myfield3", + "format.fields.2.type" -> "MAP", + "format.fields.3.name" -> "myfield4", + "format.fields.3.type" -> "MULTISET", + "format.fields.4.name" -> "myfield5", + "format.fields.4.type" -> "PRIMITIVE_ARRAY", + "format.fields.5.name" -> "myfield6", + "format.fields.5.type" -> + "OBJECT_ARRAY>", "format.field-delimiter" -> "#", "schema.0.name" -> "myfield", "schema.0.type" -> "VARCHAR", "schema.1.name" -> "myfield2", - "schema.1.type" -> "INT" + "schema.1.type" -> "INT", + "schema.2.name" -> "myfield3", + "schema.2.type" -> "MAP", + "schema.3.name" -> "myfield4", + "schema.3.type" -> "MULTISET", + "schema.4.name" -> "myfield5", + "schema.4.type" -> "PRIMITIVE_ARRAY", + "schema.5.name" -> "myfield6", + "schema.5.type" -> + "OBJECT_ARRAY>" ) val expectedProperties = if (isStreaming) { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala index 9ea8be08620e6..f35b0e01b45ec 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala @@ -18,14 +18,12 @@ package org.apache.flink.table.typeutils -import java.util - import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor} import org.apache.flink.table.api.Types import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person} import org.junit.Assert.{assertEquals, assertTrue} -import org.junit.{Assert, Test} +import org.junit.Test /** * Tests for string-based representation of [[TypeInformation]]. @@ -49,7 +47,7 @@ class TypeStringUtilsTest { // unsupported type information testReadAndWrite( - "ANY(java.lang.Void, " + + "ANY", BasicTypeInfo.VOID_TYPE_INFO) } @Test def testWriteComplexTypes(): Unit = { testReadAndWrite( - "ROW(f0 DECIMAL, f1 TINYINT)", + "ROW", Types.ROW(Types.DECIMAL, Types.BYTE)) testReadAndWrite( - "ROW(hello DECIMAL, world TINYINT)", + "ROW", Types.ROW( Array[String]("hello", "world"), Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) testReadAndWrite( - "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)", + "POJO", TypeExtractor.createTypeInfo(classOf[Person])) testReadAndWrite( - "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)", + "ANY", TypeExtractor.createTypeInfo(classOf[NonPojo])) + testReadAndWrite( + "MAP>", + Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE)) + ) + + testReadAndWrite( + "MULTISET>", + Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE)) + ) + + testReadAndWrite( + "PRIMITIVE_ARRAY", + Types.PRIMITIVE_ARRAY(Types.BYTE) + ) + + testReadAndWrite( + "OBJECT_ARRAY>", + Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])) + ) + // test escaping + assertTrue( + TypeStringUtils.readTypeInfo("ROW<\"he \\nllo\" DECIMAL, world TINYINT>") + .asInstanceOf[RowTypeInfo].getFieldNames + .sameElements(Array[String]("he \nllo", "world"))) + + // test backward compatibility with brackets () assertTrue( TypeStringUtils.readTypeInfo("ROW(\"he \\nllo\" DECIMAL, world TINYINT)") .asInstanceOf[RowTypeInfo].getFieldNames