Skip to content

Commit

Permalink
[FLINK-10170] [table] Add string representation for all Table & SQL A…
Browse files Browse the repository at this point in the history
…PI types

Since 1.6 the recommended way of creating source/sink tables is using
connector/format/schema descriptors. This commit adds string-based
representation for all types supported by the Table & SQL API.

We use a syntax similar to Hive and other SQL projects.

This closes apache#6578.
  • Loading branch information
jerryjzhang authored and twalthr committed Sep 11, 2018
1 parent a97e2f7 commit 1ae5983
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 49 deletions.
21 changes: 13 additions & 8 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,18 @@ DECIMAL
DATE
TIME
TIMESTAMP
ROW(fieldtype, ...) # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo
# with indexed fields names f0, f1, ...
ROW(fieldname fieldtype, ...) # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that
# is mapped to Flink's RowTypeInfo
POJO(class) # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo
ANY(class) # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo
ANY(class, serialized) # used for type information that is not supported by Flink's Table & SQL API
MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo
MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo
PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo
OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
# Flink's ObjectArrayTypeInfo
ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo
# with indexed fields names f0, f1, ...
ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that
# is mapped to Flink's RowTypeInfo
POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo
ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo
ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API
{% endhighlight %}

{% top %}
Expand Down Expand Up @@ -1046,4 +1051,4 @@ table.writeToSink(sink)
</div>
</div>

{% top %}
{% top %}
2 changes: 1 addition & 1 deletion docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ tables:
format:
property-version: 1
type: json
schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)"
schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
schema:
- name: rideId
type: LONG
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ tables:
- name: user
type: VARCHAR
- name: event
type: ROW(type VARCHAR, message VARCHAR)
type: ROW<type VARCHAR, message VARCHAR>
connector:
type: kafka
version: "0.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public List<Map<String, String>> properties() {
final Map<String, String> props3 = new HashMap<>();
props3.put("format.type", "json");
props3.put("format.property-version", "1");
props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)");
props3.put("format.schema", "ROW<test1 VARCHAR, test2 TIMESTAMP>");
props3.put("format.fail-on-missing-field", "true");

final Map<String, String> props4 = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io.Serializable
import org.apache.commons.codec.binary.Base64
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.table.api.{TableException, Types, ValidationException}
Expand Down Expand Up @@ -67,13 +67,24 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
lazy val ROW: Keyword = Keyword("ROW")
lazy val ANY: Keyword = Keyword("ANY")
lazy val POJO: Keyword = Keyword("POJO")
lazy val MAP: Keyword = Keyword("MAP")
lazy val MULTISET: Keyword = Keyword("MULTISET")
lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")

lazy val qualifiedName: Parser[String] =
"""\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r

lazy val base64Url: Parser[String] =
"""[A-Za-z0-9_-]*""".r

// keep parenthesis to ensure backward compatibility (this can be dropped after Flink 1.7)
lazy val leftBracket: PackratParser[(String)] =
"(" | "<"

lazy val rightBracket: PackratParser[(String)] =
")" | ">"

lazy val atomic: PackratParser[TypeInformation[_]] =
(VARCHAR | STRING) ^^ { e => Types.STRING } |
BOOLEAN ^^ { e => Types.BOOLEAN } |
Expand Down Expand Up @@ -101,34 +112,35 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
}

lazy val namedRow: PackratParser[TypeInformation[_]] =
ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ {
fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
} | failure("Named row type expected.")

lazy val unnamedRow: PackratParser[TypeInformation[_]] =
ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ {
types => Types.ROW(types: _*)
} | failure("Unnamed row type expected.")

lazy val generic: PackratParser[TypeInformation[_]] =
ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
typeClass =>
val clazz = loadClass(typeClass)
new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
}

lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ {
typeClass =>
val clazz = loadClass(typeClass)
val info = TypeExtractor.createTypeInfo(clazz)
if (!info.isInstanceOf[PojoTypeInfo[_]]) {
throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
}
info
}
lazy val pojo: PackratParser[TypeInformation[_]] =
POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
typeClass =>
val clazz = loadClass(typeClass)
val info = TypeExtractor.createTypeInfo(clazz)
if (!info.isInstanceOf[PojoTypeInfo[_]]) {
throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
}
info
}

lazy val any: PackratParser[TypeInformation[_]] =
ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ {
case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
val clazz = loadClass(typeClass)
val typeInfo = deserialize(serialized)
Expand All @@ -140,8 +152,38 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
typeInfo
}

lazy val genericMap: PackratParser[TypeInformation[_]] =
MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ {
case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=>
Types.MAP(keyTypeInfo, valueTypeInfo)
}

lazy val multiSet: PackratParser[TypeInformation[_]] =
MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
case _ ~ _ ~ elementTypeInfo ~ _ =>
Types.MULTISET(elementTypeInfo)
}

lazy val primitiveArray: PackratParser[TypeInformation[_]] =
PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
case _ ~ _ ~ componentTypeInfo ~ _ =>
Types.PRIMITIVE_ARRAY(componentTypeInfo)
}

lazy val objectArray: PackratParser[TypeInformation[_]] =
OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
case _ ~ _ ~ componentTypeInfo ~ _ =>
Types.OBJECT_ARRAY(componentTypeInfo)
}

lazy val map: PackratParser[TypeInformation[_]] =
genericMap | multiSet

lazy val array: PackratParser[TypeInformation[_]] =
primitiveArray | objectArray

lazy val typeInfo: PackratParser[TypeInformation[_]] =
namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.")
namedRow | unnamedRow | any | generic | pojo | atomic | map | array | failure("Invalid type.")

def readTypeInfo(typeString: String): TypeInformation[_] = {
parseAll(typeInfo, typeString) match {
Expand Down Expand Up @@ -182,10 +224,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {

s"$name ${normalizeTypeInfo(f._2)}"
}
s"${ROW.key}(${normalizedFields.mkString(", ")})"
s"${ROW.key}<${normalizedFields.mkString(", ")}>"

case generic: GenericTypeInfo[_] =>
s"${ANY.key}(${generic.getTypeClass.getName})"
s"${ANY.key}<${generic.getTypeClass.getName}>"

case pojo: PojoTypeInfo[_] =>
// we only support very simple POJOs that only contain extracted fields
Expand All @@ -196,24 +238,28 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
case _: InvalidTypesException => None
}
extractedInfo match {
case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})"
case Some(ei) if ei == pojo => s"${POJO.key}<${pojo.getTypeClass.getName}>"
case _ =>
throw new TableException(
"A string representation for custom POJO types is not supported yet.")
}

case _: CompositeType[_] =>
throw new TableException("A string representation for composite types is not supported yet.")
case array: PrimitiveArrayTypeInfo[_] =>
s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>"

case array: ObjectArrayTypeInfo[_, _] =>
s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>"

case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
_: PrimitiveArrayTypeInfo[_] =>
throw new TableException("A string representation for array types is not supported yet.")
case set: MultisetTypeInfo[_] =>
s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>"

case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
throw new TableException("A string representation for map types is not supported yet.")
case map: MapTypeInfo[_, _] =>
val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo)
val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo)
s"${MAP.key}<$normalizedKey, $normalizedValue>"

case any: TypeInformation[_] =>
s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>"
}

// ----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase {
"format.fields.1.name" -> "field2",
"format.fields.1.type" -> "TIMESTAMP",
"format.fields.2.name" -> "field3",
"format.fields.2.type" -> "ANY(java.lang.Class)",
"format.fields.2.type" -> "ANY<java.lang.Class>",
"format.fields.3.name" -> "field4",
"format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
"format.fields.3.type" -> "ROW<test INT, row VARCHAR>",
"format.line-delimiter" -> "^")

val props2 = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.table.descriptors

import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.api.Types
import org.apache.flink.table.runtime.utils.CommonTestData.Person
import org.apache.flink.table.utils.TableTestBase
import org.junit.Assert.assertEquals
import org.junit.Test
Expand All @@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase {
val schema = Schema()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.field("myfield3", Types.MAP(Types.STRING, Types.INT))
.field("myfield4", Types.MULTISET(Types.LONG))
.field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
.field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
// CSV table source and sink do not support proctime yet
//if (isStreaming) {
// schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
Expand All @@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase {
val format = Csv()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.field("myfield3", Types.MAP(Types.STRING, Types.INT))
.field("myfield4", Types.MULTISET(Types.LONG))
.field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
.field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
.fieldDelimiter("#")

val descriptor: RegistrableDescriptor = if (isStreaming) {
Expand Down Expand Up @@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase {
"format.fields.0.type" -> "VARCHAR",
"format.fields.1.name" -> "myfield2",
"format.fields.1.type" -> "INT",
"format.fields.2.name" -> "myfield3",
"format.fields.2.type" -> "MAP<VARCHAR, INT>",
"format.fields.3.name" -> "myfield4",
"format.fields.3.type" -> "MULTISET<BIGINT>",
"format.fields.4.name" -> "myfield5",
"format.fields.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
"format.fields.5.name" -> "myfield6",
"format.fields.5.type" ->
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
"format.field-delimiter" -> "#",
"schema.0.name" -> "myfield",
"schema.0.type" -> "VARCHAR",
"schema.1.name" -> "myfield2",
"schema.1.type" -> "INT"
"schema.1.type" -> "INT",
"schema.2.name" -> "myfield3",
"schema.2.type" -> "MAP<VARCHAR, INT>",
"schema.3.name" -> "myfield4",
"schema.3.type" -> "MULTISET<BIGINT>",
"schema.4.name" -> "myfield5",
"schema.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
"schema.5.name" -> "myfield6",
"schema.5.type" ->
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>"
)

val expectedProperties = if (isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.flink.table.typeutils

import java.util

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
import org.apache.flink.table.api.Types
import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Assert, Test}
import org.junit.Test

/**
* Tests for string-based representation of [[TypeInformation]].
Expand All @@ -49,7 +47,7 @@ class TypeStringUtilsTest {

// unsupported type information
testReadAndWrite(
"ANY(java.lang.Void, " +
"ANY<java.lang.Void, " +
"rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" +
"ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" +
"c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" +
Expand All @@ -59,31 +57,57 @@ class TypeStringUtilsTest {
"cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" +
"AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" +
"YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" +
"dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
"dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA>",
BasicTypeInfo.VOID_TYPE_INFO)
}

@Test
def testWriteComplexTypes(): Unit = {
testReadAndWrite(
"ROW(f0 DECIMAL, f1 TINYINT)",
"ROW<f0 DECIMAL, f1 TINYINT>",
Types.ROW(Types.DECIMAL, Types.BYTE))

testReadAndWrite(
"ROW(hello DECIMAL, world TINYINT)",
"ROW<hello DECIMAL, world TINYINT>",
Types.ROW(
Array[String]("hello", "world"),
Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))

testReadAndWrite(
"POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
"POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>",
TypeExtractor.createTypeInfo(classOf[Person]))

testReadAndWrite(
"ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
"ANY<org.apache.flink.table.runtime.utils.CommonTestData$NonPojo>",
TypeExtractor.createTypeInfo(classOf[NonPojo]))

testReadAndWrite(
"MAP<VARCHAR, ROW<f0 DECIMAL, f1 TINYINT>>",
Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE))
)

testReadAndWrite(
"MULTISET<ROW<f0 DECIMAL, f1 TINYINT>>",
Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE))
)

testReadAndWrite(
"PRIMITIVE_ARRAY<TINYINT>",
Types.PRIMITIVE_ARRAY(Types.BYTE)
)

testReadAndWrite(
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))
)

// test escaping
assertTrue(
TypeStringUtils.readTypeInfo("ROW<\"he \\nllo\" DECIMAL, world TINYINT>")
.asInstanceOf[RowTypeInfo].getFieldNames
.sameElements(Array[String]("he \nllo", "world")))

// test backward compatibility with brackets ()
assertTrue(
TypeStringUtils.readTypeInfo("ROW(\"he \\nllo\" DECIMAL, world TINYINT)")
.asInstanceOf[RowTypeInfo].getFieldNames
Expand Down

0 comments on commit 1ae5983

Please sign in to comment.