Skip to content

Commit

Permalink
[FLINK-17030][table-planner-blink] Add primary key syntax for ALTER T…
Browse files Browse the repository at this point in the history
…ABLE

This closes apache#11950
  • Loading branch information
danny0405 authored and wuchong committed May 9, 2020
1 parent 3765fe9 commit 28ab459
Show file tree
Hide file tree
Showing 10 changed files with 506 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
Expand Down Expand Up @@ -87,6 +89,8 @@
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
Expand All @@ -106,6 +110,7 @@
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;

import java.util.ArrayList;
Expand Down Expand Up @@ -755,6 +760,49 @@ private TableResult executeOperation(Operation operation) {
alterTablePropertiesOp.getTableIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogTable(),
false);
} else if (alterTableOperation instanceof AlterTableAddConstraintOperation){
AlterTableAddConstraintOperation addConstraintOP =
(AlterTableAddConstraintOperation) operation;
CatalogTable oriTable = (CatalogTable) catalogManager
.getTable(addConstraintOP.getTableIdentifier())
.get()
.getTable();
TableSchema.Builder builder = TableSchemaUtils
.builderWithGivenSchema(oriTable.getSchema());
if (addConstraintOP.getConstraintName().isPresent()) {
builder.primaryKey(
addConstraintOP.getConstraintName().get(),
addConstraintOP.getColumnNames());
} else {
builder.primaryKey(addConstraintOP.getColumnNames());
}
CatalogTable newTable = new CatalogTableImpl(
builder.build(),
oriTable.getPartitionKeys(),
oriTable.getOptions(),
oriTable.getComment());
catalog.alterTable(
addConstraintOP.getTableIdentifier().toObjectPath(),
newTable,
false);
} else if (alterTableOperation instanceof AlterTableDropConstraintOperation){
AlterTableDropConstraintOperation dropConstraintOperation =
(AlterTableDropConstraintOperation) operation;
CatalogTable oriTable = (CatalogTable) catalogManager
.getTable(dropConstraintOperation.getTableIdentifier())
.get()
.getTable();
CatalogTable newTable = new CatalogTableImpl(
TableSchemaUtils.dropConstraint(
oriTable.getSchema(),
dropConstraintOperation.getConstraintName()),
oriTable.getPartitionKeys(),
oriTable.getOptions(),
oriTable.getComment());
catalog.alterTable(
dropConstraintOperation.getTableIdentifier().toObjectPath(),
newTable,
false);
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
* Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause.
*
* <p>Note: only primary key is supported now.
*/
public class AlterTableAddConstraintOperation extends AlterTableOperation {
private final String constraintName;
private final String[] columnNames;

public AlterTableAddConstraintOperation(
ObjectIdentifier tableIdentifier,
@Nullable String constraintName,
String[] columnNames) {
super(tableIdentifier);
this.constraintName = constraintName;
this.columnNames = columnNames;
}

public Optional<String> getConstraintName() {
return Optional.ofNullable(constraintName);
}

public String[] getColumnNames() {
return columnNames;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("identifier", tableIdentifier);
if (getConstraintName().isPresent()) {
params.put("constraintName", this.constraintName);
}
params.put("columns", this.columnNames);

return OperationUtils.formatWithChildren(
"ALTER TABLE ADD CONSTRAINT",
params,
Collections.emptyList(),
Operation::asSummaryString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.ObjectIdentifier;

/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. **/
public class AlterTableDropConstraintOperation extends AlterTableOperation {
private final String constraintName;

public AlterTableDropConstraintOperation(
ObjectIdentifier tableIdentifier,
String constraintName) {
super(tableIdentifier);
this.constraintName = constraintName;
}

public String getConstraintName() {
return constraintName;
}

@Override
public String asSummaryString() {
return String.format("ALTER TABLE %s DROP CONSTRAINT %s",
tableIdentifier, constraintName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ public String toString() {
sb.append(" |-- ").append("WATERMARK FOR ")
.append(watermark.getRowtimeAttribute()).append(" AS ")
.append(watermark.getWatermarkExpr());
sb.append('\n');
}
}

if (primaryKey != null) {
sb.append(" |-- ").append(primaryKey.asSummaryString());
sb.append('\n');
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.Preconditions;

import java.util.List;
import java.util.Optional;

/**
* Utilities to {@link TableSchema}.
*/
Expand Down Expand Up @@ -71,4 +76,61 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
}
return schema;
}

/**
* Creates a builder with given table schema.
*
* @param oriSchema Original schema
* @return the builder with all the information from the given schema
*/
public static TableSchema.Builder builderWithGivenSchema(TableSchema oriSchema) {
TableSchema.Builder builder = builderWithGivenColumns(oriSchema.getTableColumns());
// Copy watermark specification.
for (WatermarkSpec wms : oriSchema.getWatermarkSpecs()) {
builder.watermark(
wms.getRowtimeAttribute(),
wms.getWatermarkExpr(),
wms.getWatermarkExprOutputType());
}
// Copy primary key constraint.
oriSchema.getPrimaryKey()
.map(pk -> builder.primaryKey(pk.getName(),
pk.getColumns().toArray(new String[0])));
return builder;
}

/**
* Creates a new schema but drop the constraint with given name.
*/
public static TableSchema dropConstraint(TableSchema oriSchema, String constraintName) {
// Validate the constraint name is valid.
Optional<UniqueConstraint> uniqueConstraintOpt = oriSchema.getPrimaryKey();
if (!uniqueConstraintOpt.isPresent()
|| !uniqueConstraintOpt.get().getName().equals(constraintName)) {
throw new ValidationException(
String.format("Constraint %s to drop does not exist", constraintName));
}
TableSchema.Builder builder = builderWithGivenColumns(oriSchema.getTableColumns());
// Copy watermark specification.
for (WatermarkSpec wms : oriSchema.getWatermarkSpecs()) {
builder.watermark(
wms.getRowtimeAttribute(),
wms.getWatermarkExpr(),
wms.getWatermarkExprOutputType());
}
return builder.build();
}

/** Returns the builder with copied columns info from the given table schema. */
private static TableSchema.Builder builderWithGivenColumns(List<TableColumn> oriColumns) {
TableSchema.Builder builder = TableSchema.builder();
for (TableColumn column : oriColumns) {
if (column.isGenerated()) {
builder.field(column.getName(), column.getType(), column.getExpr().get());
} else {
builder.field(column.getName(), column.getType());
}
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testTableSchema() {
" |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>\n" +
" |-- f2: STRING\n" +
" |-- f3: BIGINT AS f0 + 1\n" +
" |-- WATERMARK FOR f1.q2 AS now()";
" |-- WATERMARK FOR f1.q2 AS now()\n";
assertEquals(expected, schema.toString());

// test getFieldNames and getFieldDataType
Expand Down Expand Up @@ -213,7 +213,7 @@ public void testPrimaryKeyPrinting() {
" |-- f0: BIGINT NOT NULL\n" +
" |-- f1: STRING NOT NULL\n" +
" |-- f2: DOUBLE NOT NULL\n" +
" |-- CONSTRAINT pk PRIMARY KEY (f0, f2)"
" |-- CONSTRAINT pk PRIMARY KEY (f0, f2)\n"
));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.utils;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.junit.Assert.assertEquals;

/** Tests for TableSchemaUtils. */
public class TableSchemaUtilsTest {
@Rule
public ExpectedException exceptionRule = ExpectedException.none();

@Test
public void testBuilderWithGivenSchema() {
TableSchema oriSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.field("c", DataTypes.INT(), "a + 1")
.field("t", DataTypes.TIMESTAMP(3))
.primaryKey("ct1", new String[] {"a"})
.watermark("t", "t", DataTypes.TIMESTAMP(3))
.build();
TableSchema newSchema = TableSchemaUtils.builderWithGivenSchema(oriSchema).build();
assertEquals(oriSchema, newSchema);
}

@Test
public void testDropConstraint() {
TableSchema oriSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.field("c", DataTypes.INT(), "a + 1")
.field("t", DataTypes.TIMESTAMP(3))
.primaryKey("ct1", new String[] {"a"})
.watermark("t", "t", DataTypes.TIMESTAMP(3))
.build();
TableSchema newSchema = TableSchemaUtils.dropConstraint(oriSchema, "ct1");
final String expected = "root\n" +
" |-- a: INT NOT NULL\n" +
" |-- b: STRING\n" +
" |-- c: INT AS a + 1\n" +
" |-- t: TIMESTAMP(3)\n" +
" |-- WATERMARK FOR t AS t\n";
assertEquals(expected, newSchema.toString());
// Drop non-exist constraint.
exceptionRule.expect(ValidationException.class);
exceptionRule.expectMessage("Constraint ct2 to drop does not exist");
TableSchemaUtils.dropConstraint(oriSchema, "ct2");
}
}
Loading

0 comments on commit 28ab459

Please sign in to comment.