Skip to content

Commit

Permalink
[FLINK-12234][hive] Support view related operations in HiveCatalog
Browse files Browse the repository at this point in the history
This PR supports view related operations in HiveCatalog and creates HiveCatalogView.

This closes apache#8434.
  • Loading branch information
bowenli86 committed May 14, 2019
1 parent f7ecfdb commit 544903d
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
hiveTable.setViewExpandedText(view.getExpandedQuery());
hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
throw new IllegalArgumentException(
throw new CatalogException(
"GenericHiveMetastoreCatalog only supports CatalogTable and CatalogView");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
Expand All @@ -40,6 +39,7 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
Expand Down Expand Up @@ -98,8 +98,7 @@ protected Database createHiveDatabase(String databaseName, CatalogDatabase catal
@Override
protected void validateCatalogBaseTable(CatalogBaseTable table)
throws CatalogException {
// TODO: validate HiveCatalogView
if (!(table instanceof HiveCatalogTable)) {
if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView)) {
throw new CatalogException(
"HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
}
Expand All @@ -126,7 +125,18 @@ protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
.collect(Collectors.toList());
}

return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
return new HiveCatalogView(
hiveTable.getViewOriginalText(),
hiveTable.getViewExpandedText(),
tableSchema,
properties,
comment
);
} else {
return new HiveCatalogTable(
tableSchema, partitionKeys, properties, comment);
}
}

@Override
Expand All @@ -153,7 +163,7 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());

// Table columns and partition keys
if (table instanceof CatalogTable) {
if (table instanceof HiveCatalogTable) {
HiveCatalogTable catalogTable = (HiveCatalogTable) table;

if (catalogTable.isPartitioned()) {
Expand All @@ -167,8 +177,19 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
} else if (table instanceof HiveCatalogView) {
HiveCatalogView view = (HiveCatalogView) table;

// TODO: [FLINK-12398] Support partitioned view in catalog API
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());

hiveTable.setViewOriginalText(view.getOriginalQuery());
hiveTable.setViewExpandedText(view.getExpandedQuery());
hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
throw new UnsupportedOperationException("HiveCatalog doesn't support view yet");
throw new CatalogException(
"HiveCatalog only supports HiveCatalogTable and HiveCatalogView");
}

hiveTable.setSd(sd);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.catalog.hive;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.util.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A Hive catalog view implementation.
*/
public class HiveCatalogView implements CatalogView {
// Original text of the view definition.
private final String originalQuery;

// Expanded text of the original view definition
// This is needed because the context such as current DB is
// lost after the session, in which view is defined, is gone.
// Expanded query text takes care of the this, as an example.
private final String expandedQuery;

// Schema of the view (column names and types)
private final TableSchema tableSchema;
// Properties of the view
private final Map<String, String> properties;
// Comment of the view
private String comment = "This is a hive catalog view.";

public HiveCatalogView(
String originalQuery,
String expandedQuery,
TableSchema tableSchema,
Map<String, String> properties,
String comment) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");

this.originalQuery = originalQuery;
this.expandedQuery = expandedQuery;
this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
}

@Override
public String getOriginalQuery() {
return originalQuery;
}

@Override
public String getExpandedQuery() {
return expandedQuery;
}

@Override
public Map<String, String> getProperties() {
return properties;
}

@Override
public TableSchema getSchema() {
return tableSchema;
}

@Override
public String getComment() {
return comment;
}

@Override
public CatalogBaseTable copy() {
return new HiveCatalogView(
originalQuery, expandedQuery, tableSchema.copy(), new HashMap<>(properties), comment);
}

@Override
public Optional<String> getDescription() {
return Optional.ofNullable(comment);
}

@Override
public Optional<String> getDetailedDescription() {
// TODO: return a detailed description
return Optional.ofNullable(comment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,6 @@ public static void init() throws IOException {
public void testCreateTable_Streaming() throws Exception {
}

// =====================
// HiveCatalog doesn't support view operation yet
// Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against HiveCatalog
// =====================

// TODO: re-enable these tests once HiveCatalog support view operations

public void testListTables() throws Exception {
}

public void testCreateView() throws Exception {
}

public void testCreateView_DatabaseNotExistException() throws Exception {
}

public void testCreateView_TableAlreadyExistException() throws Exception {
}

public void testCreateView_TableAlreadyExist_ignored() throws Exception {
}

public void testDropView() throws Exception {
}

public void testAlterView() throws Exception {
}

public void testAlterView_TableNotExistException() throws Exception {
}

public void testAlterView_TableNotExist_ignored() throws Exception {
}

public void testListView() throws Exception {
}

public void testRenameView() throws Exception {
}

// ------ utils ------

@Override
Expand Down Expand Up @@ -154,14 +114,22 @@ public CatalogTable createAnotherPartitionedTable() {

@Override
public CatalogView createView() {
// TODO: implement this once HiveCatalog support view operations
return null;
return new HiveCatalogView(
String.format("select * from %s", t1),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
createTableSchema(),
new HashMap<>(),
"This is a hive view");
}

@Override
public CatalogView createAnotherView() {
// TODO: implement this once HiveCatalog support view operations
return null;
return new HiveCatalogView(
String.format("select * from %s", t2),
String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
createAnotherTableSchema(),
new HashMap<>(),
"This is another hive view");
}

@Override
Expand All @@ -175,4 +143,15 @@ public void checkEquals(CatalogTable t1, CatalogTable t2) {
// thus properties of Hive table is a super set of those in its corresponding Flink table
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
}

protected void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getSchema(), v1.getSchema());
assertEquals(v1.getComment(), v2.getComment());
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());

// Hive views may have properties created by itself
// thus properties of Hive view is a super set of those in its corresponding Flink view
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public TableSchema getSchema() {
return schema;
}

@Override
public String getComment() {
return comment;
}

@Override
public GenericCatalogView copy() {
return new GenericCatalogView(this.originalQuery, this.expandedQuery, schema.copy(),
Expand All @@ -90,13 +95,4 @@ public Optional<String> getDescription() {
public Optional<String> getDetailedDescription() {
return Optional.of("This is a catalog view in an im-memory catalog");
}

public String getComment() {
return this.comment;
}

public void setComment(String comment) {
this.comment = comment;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void testCreateView() throws Exception {
catalog.createTable(path1, view, false);

assertTrue(catalog.getTable(path1) instanceof CatalogView);
CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
checkEquals(view, (CatalogView) catalog.getTable(path1));
}

@Test
Expand Down Expand Up @@ -481,12 +481,12 @@ public void testCreateView_TableAlreadyExist_ignored() throws Exception {
catalog.createTable(path1, view, false);

assertTrue(catalog.getTable(path1) instanceof CatalogView);
CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
checkEquals(view, (CatalogView) catalog.getTable(path1));

catalog.createTable(path1, createAnotherView(), true);

assertTrue(catalog.getTable(path1) instanceof CatalogView);
CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
checkEquals(view, (CatalogView) catalog.getTable(path1));
}

@Test
Expand All @@ -508,13 +508,13 @@ public void testAlterView() throws Exception {
CatalogView view = createView();
catalog.createTable(path1, view, false);

CatalogTestUtil.checkEquals(view, (CatalogView) catalog.getTable(path1));
checkEquals(view, (CatalogView) catalog.getTable(path1));

CatalogView newView = createAnotherView();
catalog.alterTable(path1, newView, false);

assertTrue(catalog.getTable(path1) instanceof CatalogView);
CatalogTestUtil.checkEquals(newView, (CatalogView) catalog.getTable(path1));
checkEquals(newView, (CatalogView) catalog.getTable(path1));
}

@Test
Expand Down Expand Up @@ -673,4 +673,12 @@ protected void checkEquals(CatalogTable t1, CatalogTable t2) {
assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
assertEquals(t1.isPartitioned(), t2.isPartitioned());
}

protected void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getSchema(), v1.getSchema());
assertEquals(v1.getProperties(), v2.getProperties());
assertEquals(v1.getComment(), v2.getComment());
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ public static void checkEquals(TableStats ts1, TableStats ts2) {
assertEquals(ts1.getColumnStats().size(), ts2.getColumnStats().size());
}

public static void checkEquals(CatalogView v1, CatalogView v2) {
assertEquals(v1.getSchema(), v1.getSchema());
assertEquals(v1.getProperties(), v2.getProperties());
assertEquals(v1.getComment(), v2.getComment());
assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
}

public static void checkEquals(CatalogDatabase d1, CatalogDatabase d2) {
assertEquals(d1.getProperties(), d2.getProperties());
}
Expand Down

0 comments on commit 544903d

Please sign in to comment.