Skip to content

Commit

Permalink
[FLINK-26132][table-planner] Fix serialization of anonymous tables
Browse files Browse the repository at this point in the history
This closes apache#18770.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 1, 2022
1 parent e16ef6b commit dcd2156
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<td><h5>table.plan.compile.catalog-objects</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ALL</td>
<td><p>Enum</p></td>
<td>Strategy how to persist catalog objects such as tables, functions, or data types into a plan during compilation. It influences the need for catalog metadata to be present during a restore operation and affects the plan size.<br /><br />Possible values:<ul><li>"ALL": All metadata about catalog tables, functions, or data types will be persisted into the plan during compilation. For catalog tables, this includes the table's identifier, schema, and options. For catalog functions, this includes the function's identifier and class. For catalog data types, this includes the identifier and entire type structure. With this strategy, the catalog's metadata doesn't have to be available anymore during a restore operation.</li><li>"SCHEMA": In addition to an identifier, schema information about catalog tables, functions, or data types will be persisted into the plan during compilation. A schema allows for detecting incompatible changes in the catalog during a plan restore operation. However, all other metadata will still be retrieved from the catalog.</li><li>"IDENTIFIER": Only the identifier of catalog tables, functions, or data types will be persisted into the plan during compilation. All metadata will be retrieved from the catalog during a restore operation. With this strategy, plans become less verbose.</li></ul></td>
<td>Strategy how to persist catalog objects such as tables, functions, or data types into a plan during compilation.<br /><br />It influences the need for catalog metadata to be present during a restore operation and affects the plan size.<br /><br />This configuration option does not affect anonymous/inline or temporary objects. Anonymous/inline objects will be persisted entirely (including schema and options) if possible or fail the compilation otherwise. Temporary objects will be persisted only by their identifier and the object needs to be present in the session context during a restore.<br /><br />Possible values:<ul><li>"ALL": All metadata about catalog tables, functions, or data types will be persisted into the plan during compilation. For catalog tables, this includes the table's identifier, schema, and options. For catalog functions, this includes the function's identifier and class. For catalog data types, this includes the identifier and entire type structure. With this strategy, the catalog's metadata doesn't have to be available anymore during a restore operation.</li><li>"SCHEMA": In addition to an identifier, schema information about catalog tables, functions, or data types will be persisted into the plan during compilation. A schema allows for detecting incompatible changes in the catalog during a plan restore operation. However, all other metadata will still be retrieved from the catalog.</li><li>"IDENTIFIER": Only the identifier of catalog tables, functions, or data types will be persisted into the plan during compilation. All metadata will be retrieved from the catalog during a restore operation. With this strategy, plans become less verbose.</li></ul></td>
</tr>
<tr>
<td><h5>table.plan.force-recompile</h5><br> <span class="label label-primary">Streaming</span></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
*
* <p>Depending on the configuration, permanent catalog metadata (such as information about tables
* and functions) will be persisted in the plan as well. Anonymous/inline objects will be persisted
* if possible or fail the compilation otherwise. Temporary objects are never part of a plan and
* need to be present during a restore.
* (including schema and options) if possible or fail the compilation otherwise. For temporary
* objects, only the identifier is part of the plan and the object needs to be present in the
* session context during a restore.
*
* <p>Note: Plan restores assume a stable session context. Configuration, loaded modules and
* catalogs, and temporary objects must not change. Schema evolution and changes of function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.table.api.PlannerType;
import org.apache.flink.table.api.SqlDialect;
Expand Down Expand Up @@ -105,10 +106,26 @@ private TableConfigOptions() {}
.enumType(CatalogPlanCompilation.class)
.defaultValue(CatalogPlanCompilation.ALL)
.withDescription(
"Strategy how to persist catalog objects such as tables, functions, or data "
+ "types into a plan during compilation. It influences the need "
+ "for catalog metadata to be present during a restore operation "
+ "and affects the plan size.");
Description.builder()
.text(
"Strategy how to persist catalog objects such as tables, "
+ "functions, or data types into a plan during compilation.")
.linebreak()
.linebreak()
.text(
"It influences the need for catalog metadata to be present "
+ "during a restore operation and affects the plan size.")
.linebreak()
.linebreak()
.text(
"This configuration option does not affect anonymous/inline "
+ "or temporary objects. Anonymous/inline objects will "
+ "be persisted entirely (including schema and options) "
+ "if possible or fail the compilation otherwise. "
+ "Temporary objects will be persisted only by their "
+ "identifier and the object needs to be present in "
+ "the session context during a restore.")
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<CatalogPlanRestore> PLAN_RESTORE_CATALOG_OBJECTS =
Expand Down Expand Up @@ -159,7 +176,15 @@ private TableConfigOptions() {}
// Enum option types
// ------------------------------------------------------------------------------------------

/** Strategy to compile {@link Catalog} objects into a plan. */
/**
* Strategy to compile {@link Catalog} objects into a plan.
*
* <p>Depending on the configuration, permanent catalog metadata (such as information about
* tables and functions) will be persisted in the plan as well. Anonymous/inline objects will be
* persisted (including schema and options) if possible or fail the compilation otherwise. For
* temporary objects, only the identifier is part of the plan and the object needs to be present
* in the session context during a restore.
*/
@PublicEvolving
public enum CatalogPlanCompilation implements DescribedEnum {
ALL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
import org.apache.flink.table.catalog.CatalogManager;
Expand All @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.ResolvedSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -39,7 +40,6 @@
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore.IDENTIFIER;
import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS;
import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_CATALOG_TABLE;
Expand All @@ -55,6 +55,9 @@
final class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
private static final long serialVersionUID = 1L;

private static final JsonPointer optionsPointer =
JsonPointer.compile("/" + FIELD_NAME_CATALOG_TABLE + "/" + OPTIONS);

ContextResolvedTableJsonDeserializer() {
super(ContextResolvedTable.class);
}
Expand All @@ -77,7 +80,7 @@ public ContextResolvedTable deserialize(JsonParser jsonParser, DeserializationCo
jsonParser.getCodec(),
ctx)
.orElse(null);
ResolvedCatalogTable resolvedCatalogTable =
final ResolvedCatalogTable resolvedCatalogTable =
JsonSerdeUtil.deserializeOptionalField(
objectNode,
FIELD_NAME_CATALOG_TABLE,
Expand All @@ -87,47 +90,49 @@ public ContextResolvedTable deserialize(JsonParser jsonParser, DeserializationCo
.orElse(null);

if (identifier == null && resolvedCatalogTable == null) {
throw new ValidationException(
throw new TableException(
String.format(
"The input JSON is invalid because it doesn't contain '%s', nor the '%s'.",
"The input JSON is invalid because it does neither contain '%s' nor '%s'.",
FIELD_NAME_IDENTIFIER, FIELD_NAME_CATALOG_TABLE));
}

if (identifier == null) {
if (isLookupForced(planRestoreOption)) {
throw missingIdentifier();
}
return ContextResolvedTable.anonymous(resolvedCatalogTable);
}

Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
isLookupEnabled(planRestoreOption)
? catalogManager.getTable(identifier)
: Optional.empty();
final Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
catalogManager.getTable(identifier);

// If plan has no catalog table field or no options field,
// the table is permanent in the catalog and the option is plan all enforced, then fail
if ((resolvedCatalogTable == null || objectNode.at(optionsPointer).isMissingNode())
&& isPlanEnforced(planRestoreOption)
&& contextResolvedTableFromCatalog
.map(ContextResolvedTable::isPermanent)
.orElse(false)) {
throw lookupDisabled(identifier);
}

// If we have a schema from the plan and from the catalog, we need to check they match.
if (contextResolvedTableFromCatalog.isPresent() && resolvedCatalogTable != null) {
ResolvedSchema schemaFromPlan = resolvedCatalogTable.getResolvedSchema();
ResolvedSchema schemaFromCatalog =
final ResolvedSchema schemaFromPlan = resolvedCatalogTable.getResolvedSchema();
final ResolvedSchema schemaFromCatalog =
contextResolvedTableFromCatalog.get().getResolvedSchema();
if (!areResolvedSchemasEqual(schemaFromPlan, schemaFromCatalog)) {
throw schemaNotMatching(identifier, schemaFromPlan, schemaFromCatalog);
}
}

// We use what is stored inside the catalog,
if (resolvedCatalogTable == null || isLookupForced(planRestoreOption)) {
if (!isLookupEnabled(planRestoreOption)) {
throw lookupDisabled(identifier);
}
// We use what is stored inside the catalog
return contextResolvedTableFromCatalog.orElseThrow(
() -> missingTableFromCatalog(identifier, isLookupForced(planRestoreOption)));
}

if (contextResolvedTableFromCatalog.isPresent()) {
// If no config map is present, then the ContextResolvedTable was serialized with
// SCHEMA, so we just need to return the catalog query result
if (objectNode.at("/" + FIELD_NAME_CATALOG_TABLE + "/" + OPTIONS).isMissingNode()) {
if (objectNode.at(optionsPointer).isMissingNode()) {
return contextResolvedTableFromCatalog.get();
}

Expand All @@ -147,16 +152,16 @@ private boolean areResolvedSchemasEqual(
// * Columns size and order
// * For each column: name, kind (class) and type
// * Check partition keys set equality
List<Column> columnsFromPlan = schemaFromPlan.getColumns();
List<Column> columnsFromCatalog = schemaFromCatalog.getColumns();
final List<Column> columnsFromPlan = schemaFromPlan.getColumns();
final List<Column> columnsFromCatalog = schemaFromCatalog.getColumns();

if (columnsFromPlan.size() != columnsFromCatalog.size()) {
return false;
}

for (int i = 0; i < columnsFromPlan.size(); i++) {
Column columnFromPlan = columnsFromPlan.get(i);
Column columnFromCatalog = columnsFromCatalog.get(i);
final Column columnFromPlan = columnsFromPlan.get(i);
final Column columnFromCatalog = columnsFromCatalog.get(i);
if (!Objects.equals(columnFromPlan.getName(), columnFromCatalog.getName())
|| !Objects.equals(columnFromPlan.getClass(), columnFromCatalog.getClass())
|| !Objects.equals(
Expand All @@ -169,75 +174,61 @@ private boolean areResolvedSchemasEqual(
}

private boolean isLookupForced(CatalogPlanRestore planRestoreOption) {
return planRestoreOption == IDENTIFIER;
}

private boolean isLookupEnabled(CatalogPlanRestore planRestoreOption) {
return planRestoreOption != CatalogPlanRestore.ALL_ENFORCED;
return planRestoreOption == CatalogPlanRestore.IDENTIFIER;
}

static ValidationException missingIdentifier() {
return new ValidationException(
String.format(
"The table cannot be deserialized as no identifier is present in the persisted plan."
+ "However, lookup is forced by '%s' = '%s'. "
+ "Either allow restoring the table from the catalog with '%s' = '%s' / '%s' "
+ "or make sure to not use anonymous tables when generating the plan.",
PLAN_RESTORE_CATALOG_OBJECTS.key(),
IDENTIFIER.name(),
PLAN_RESTORE_CATALOG_OBJECTS.key(),
CatalogPlanRestore.ALL.name(),
CatalogPlanRestore.ALL_ENFORCED.name()));
private boolean isPlanEnforced(CatalogPlanRestore planRestoreOption) {
return planRestoreOption == CatalogPlanRestore.ALL_ENFORCED;
}

static ValidationException lookupDisabled(ObjectIdentifier objectIdentifier) {
return new ValidationException(
static TableException lookupDisabled(ObjectIdentifier objectIdentifier) {
return new TableException(
String.format(
"The persisted plan does not include all required catalog metadata for table '%s'. "
+ "However, lookup is disabled because option '%s' = '%s'. "
+ "Either enable the catalog lookup with '%s' = '%s' / '%s' or "
+ "regenerate the plan with '%s' != '%s'. "
+ "regenerate the plan with '%s' = '%s'. "
+ "Make sure the table is not compiled as a temporary table.",
objectIdentifier.asSummaryString(),
PLAN_RESTORE_CATALOG_OBJECTS.key(),
CatalogPlanRestore.ALL_ENFORCED.name(),
PLAN_RESTORE_CATALOG_OBJECTS.key(),
IDENTIFIER.name(),
CatalogPlanRestore.IDENTIFIER.name(),
CatalogPlanRestore.ALL.name(),
PLAN_COMPILE_CATALOG_OBJECTS.key(),
CatalogPlanCompilation.IDENTIFIER.name()));
CatalogPlanCompilation.ALL.name()));
}

static ValidationException schemaNotMatching(
static TableException schemaNotMatching(
ObjectIdentifier objectIdentifier,
ResolvedSchema schemaFromPlan,
ResolvedSchema schemaFromCatalog) {
return new ValidationException(
return new TableException(
String.format(
"The schema of table '%s' from the persisted plan does not match the "
+ "schema loaded from the catalog: '%s' != '%s'. "
+ "Make sure the table schema in the catalog is still identical.",
objectIdentifier.asSummaryString(), schemaFromPlan, schemaFromCatalog));
}

static ValidationException missingTableFromCatalog(
static TableException missingTableFromCatalog(
ObjectIdentifier identifier, boolean forcedLookup) {
String initialReason;
final String initialReason;
if (forcedLookup) {
initialReason =
String.format(
"Cannot resolve table '%s' and catalog lookup is forced because '%s' = '%s'. ",
identifier.asSummaryString(),
PLAN_RESTORE_CATALOG_OBJECTS.key(),
IDENTIFIER);
CatalogPlanRestore.IDENTIFIER.name());
} else {
initialReason =
String.format(
"Cannot resolve table '%s' and the persisted plan does not include "
+ "all required catalog table metadata. ",
identifier.asSummaryString());
}
return new ValidationException(
return new TableException(
initialReason
+ String.format(
"Make sure a registered catalog contains the table when restoring or "
Expand Down
Loading

0 comments on commit dcd2156

Please sign in to comment.