Skip to content

Commit

Permalink
[FLINK-19273][sql-parser] Support METADATA syntax in SQL parser
Browse files Browse the repository at this point in the history
Updates the parser to accept METADATA syntax for columns and in
the LIKE clause. Reworks the class hierarchy of table columns.
Introduces new keywords and grammar.

This closes apache#13452.
  • Loading branch information
twalthr committed Sep 23, 2020
1 parent c50b027 commit 46f1c44
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,42 +397,52 @@ SqlDrop SqlDropTable(Span s, boolean replace) :
}
}

void TableColumn2(List<SqlNode> list) :
void RegularColumn(List<SqlNode> list) :
{
SqlParserPos pos;
SqlIdentifier name;
SqlDataTypeSpec type;
SqlCharStringLiteral comment = null;
SqlNode comment = null;
}
{
name = SimpleIdentifier()
type = ExtendedDataType()
[ <COMMENT> <QUOTED_STRING> {
comment = createStringLiteral(token.image, getPos());
}]
[
<COMMENT>
comment = StringLiteral()
]
{
SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos());
list.add(tableColumn);
SqlTableColumn regularColumn = new SqlTableColumn.SqlRegularColumn(
getPos(),
name,
comment,
type,
null);
list.add(regularColumn);
}
}

void PartColumnDef(List<SqlNode> list) :
{
SqlParserPos pos;
SqlIdentifier name;
SqlDataTypeSpec type;
SqlCharStringLiteral comment = null;
SqlNode comment = null;
}
{
name = SimpleIdentifier()
type = DataType()
[ <COMMENT> <QUOTED_STRING> {
comment = createStringLiteral(token.image, getPos());
}]
[
<COMMENT>
comment = StringLiteral()
]
{
type = type.withNullable(true);
SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos());
list.add(tableColumn);
SqlTableColumn regularColumn = new SqlTableColumn.SqlRegularColumn(
getPos(),
name,
comment,
type,
null);
list.add(regularColumn);
}
}

Expand Down Expand Up @@ -515,7 +525,12 @@ void TableColumnWithConstraint(HiveTableCreationContext context) :
context.notNullTraits.add(constraintTrait);
context.notNullCols.add(name);
}
SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos());
SqlTableColumn tableColumn = new SqlTableColumn.SqlRegularColumn(
getPos(),
name,
comment,
type,
null);
context.columnList.add(tableColumn);
}
[ <COMMENT> <QUOTED_STRING> {
Expand Down Expand Up @@ -1237,9 +1252,9 @@ SqlAlterTable SqlAlterHiveTableAddReplaceColumn(SqlParserPos startPos, SqlIdenti
{
List<SqlNode> cols = new ArrayList();
}
TableColumn2(cols)
RegularColumn(cols)
(
<COMMA> TableColumn2(cols)
<COMMA> RegularColumn(cols)
)*
<RPAREN>
[
Expand Down Expand Up @@ -1282,13 +1297,21 @@ SqlAlterTable SqlAlterHiveTableChangeColumn(SqlParserPos startPos, SqlIdentifier
|
<RESTRICT>
]
{ return new SqlAlterHiveTableChangeColumn(startPos.plus(getPos()),
tableIdentifier,
cascade,
oldName,
new SqlTableColumn(newName, newType, null, comment, newName.getParserPosition()),
first,
after); }
{
return new SqlAlterHiveTableChangeColumn(
startPos.plus(getPos()),
tableIdentifier,
cascade,
oldName,
new SqlTableColumn.SqlRegularColumn(
newName.getParserPosition(),
newName,
comment,
newType,
null),
first,
after);
}
}

SqlAlterTable SqlAlterHiveTableSerDe(SqlParserPos startPos, SqlIdentifier tableIdentifier, SqlNodeList partitionSpec) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.sql.parser.SqlProperty;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.hive.impl.ParseException;
import org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec;
Expand Down Expand Up @@ -166,13 +167,13 @@ public static SqlTableOption toTableOption(String key, String value, SqlParserPo
public static void convertDataTypes(SqlNodeList columns) throws ParseException {
if (columns != null) {
for (SqlNode node : columns) {
convertDataTypes((SqlTableColumn) node);
convertDataTypes((SqlRegularColumn) node);
}
}
}

// Check and convert data types to comply with HiveQL, e.g. TIMESTAMP and BINARY
public static void convertDataTypes(SqlTableColumn column) throws ParseException {
public static void convertDataTypes(SqlRegularColumn column) throws ParseException {
column.setType(convertDataTypes(column.getType()));
}

Expand Down Expand Up @@ -311,18 +312,18 @@ public static byte encodeConstraintTrait(SqlHiveConstraintTrait trait) {
public static SqlNodeList deepCopyColList(SqlNodeList colList) {
SqlNodeList res = new SqlNodeList(colList.getParserPosition());
for (SqlNode node : colList) {
res.add(deepCopyTableColumn((SqlTableColumn) node));
res.add(deepCopyTableColumn((SqlRegularColumn) node));
}
return res;
}

public static SqlTableColumn deepCopyTableColumn(SqlTableColumn column) {
return new SqlTableColumn(
column.getName(),
column.getType(),
column.getConstraint().orElse(null),
column.getComment().orElse(null),
column.getParserPosition()
public static SqlRegularColumn deepCopyTableColumn(SqlRegularColumn column) {
return new SqlTableColumn.SqlRegularColumn(
column.getParserPosition(),
column.getName(),
column.getComment().orElse(null),
column.getType(),
column.getConstraint().orElse(null)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.sql.parser.hive.ddl;

import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import org.apache.flink.sql.parser.hive.impl.ParseException;

import org.apache.calcite.sql.SqlIdentifier;
Expand All @@ -32,11 +32,11 @@
*/
public class SqlAlterHiveTableChangeColumn extends SqlChangeColumn {

private final SqlTableColumn origNewColumn;
private final SqlRegularColumn origNewColumn;
private final boolean cascade;

public SqlAlterHiveTableChangeColumn(SqlParserPos pos, SqlIdentifier tableName, boolean cascade,
SqlIdentifier oldName, SqlTableColumn newColumn, boolean first, SqlIdentifier after) throws ParseException {
SqlIdentifier oldName, SqlRegularColumn newColumn, boolean first, SqlIdentifier after) throws ParseException {
super(pos, tableName, oldName, newColumn, after, first, new SqlNodeList(pos));
this.origNewColumn = HiveDDLUtils.deepCopyTableColumn(newColumn);
HiveDDLUtils.convertDataTypes(newColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.hive.impl.ParseException;
Expand Down Expand Up @@ -307,7 +308,7 @@ private void unparseColumns(HiveTableCreationContext context, SqlNodeList column
int traitIndex = 0;
for (SqlNode node : columns) {
printIndent(writer);
SqlTableColumn column = (SqlTableColumn) node;
SqlRegularColumn column = (SqlRegularColumn) node;
column.getName().unparse(writer, leftPrec, rightPrec);
writer.print(" ");
column.getType().unparse(writer, leftPrec, rightPrec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,24 +340,24 @@ public void testAlterFileFormat() {
@Test
public void testChangeColumn() {
sql("alter table tbl change c c1 struct<f0:timestamp,f1:array<char(5)>> restrict")
.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C1` STRUCT< `F0` TIMESTAMP, `F1` ARRAY< CHAR(5) > > RESTRICT");
.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C1` STRUCT< `F0` TIMESTAMP, `F1` ARRAY< CHAR(5) > > RESTRICT");
sql("alter table tbl change column c c decimal(5,2) comment 'new comment' first cascade")
.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C` DECIMAL(5, 2) COMMENT 'new comment' FIRST CASCADE");
.ok("ALTER TABLE `TBL` CHANGE COLUMN `C` `C` DECIMAL(5, 2) COMMENT 'new comment' FIRST CASCADE");
}

@Test
public void testAddReplaceColumn() {
sql("alter table tbl add columns (a float,b timestamp,c binary) cascade")
.ok("ALTER TABLE `TBL` ADD COLUMNS (\n" +
" `A` FLOAT,\n" +
" `B` TIMESTAMP,\n" +
" `C` BINARY\n" +
" `A` FLOAT,\n" +
" `B` TIMESTAMP,\n" +
" `C` BINARY\n" +
") CASCADE");
sql("alter table tbl replace columns (a char(100),b tinyint comment 'tiny comment',c smallint) restrict")
.ok("ALTER TABLE `TBL` REPLACE COLUMNS (\n" +
" `A` CHAR(100),\n" +
" `B` TINYINT COMMENT 'tiny comment',\n" +
" `C` SMALLINT\n" +
" `A` CHAR(100),\n" +
" `B` TINYINT COMMENT 'tiny comment',\n" +
" `C` SMALLINT\n" +
") RESTRICT");
}

Expand Down
4 changes: 4 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
"EXTENDED"
"FUNCTIONS"
"IF"
"METADATA"
"OVERWRITE"
"OVERWRITING"
"PARTITIONED"
Expand All @@ -103,6 +104,7 @@
"TABLES"
"USE"
"VIEWS"
"VIRTUAL"
"WATERMARK"
"WATERMARKS"
]
Expand Down Expand Up @@ -429,10 +431,12 @@
# not in core, added in Flink
"ENFORCED"
"IF"
"METADATA"
"OVERWRITE"
"OVERWRITING"
"PARTITIONED"
"PARTITIONS"
"VIRTUAL"
]

# List of non-reserved keywords to remove;
Expand Down
Loading

0 comments on commit 46f1c44

Please sign in to comment.