From 1080a638382676b85b9e739d41775b4dcab7b439 Mon Sep 17 00:00:00 2001 From: Shengkai <1059623455@qq.com> Date: Fri, 26 Mar 2021 00:15:33 +0800 Subject: [PATCH] [FLINK-21463][table-api] Support to parse 'RESET key' command --- .../operations/command/ResetOperation.java | 26 +++++++++++++++++-- .../parse/ResetOperationParseStrategy.java | 17 ++++++++++-- .../planner/delegation/ParserImplTest.java | 2 ++ .../parse/ResetOperationParseStrategy.java | 17 ++++++++++-- 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java index 8a33b57483b9b..3fc95af8726f0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java @@ -20,10 +20,32 @@ import org.apache.flink.table.operations.Operation; -/** Operation that represent RESET command. */ +import javax.annotation.Nullable; + +import java.util.Optional; + +/** + * Operation to represent RESET command. If {@link #getKey()} is empty, it means reset the session + * configuration. + */ public class ResetOperation implements Operation { + + @Nullable private final String key; + + public ResetOperation(@Nullable String key) { + this.key = key; + } + + public Optional getKey() { + return Optional.ofNullable(key); + } + @Override public String asSummaryString() { - return "RESET"; + if (key == null) { + return "RESET"; + } else { + return String.format("RESET %s", key); + } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java index 8a7e7519e1059..708834d3e2686 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java @@ -18,9 +18,11 @@ package org.apache.flink.table.planner.parse; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.command.ResetOperation; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** Strategy to parse statement to {@link ResetOperation}. */ @@ -29,12 +31,23 @@ public class ResetOperationParseStrategy extends AbstractRegexParseStrategy { static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy(); private ResetOperationParseStrategy() { - super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS)); + super(Pattern.compile("RESET(\\s+(?\\S+)\\s*)?", DEFAULT_PATTERN_FLAGS)); } @Override public Operation convert(String statement) { - return new ResetOperation(); + Matcher matcher = pattern.matcher(statement.trim()); + String key; + + if (matcher.find()) { + key = matcher.group("key"); + } else { + throw new TableException( + String.format( + "Failed to convert the statement to RESET operation: %s.", statement)); + } + + return new ResetOperation(key); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java index 961e7ed871e82..fc70477100d90 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java @@ -87,6 +87,8 @@ public class ParserImplTest { forStatement("qUIT").summary("QUIT"), forStatement("ExIT").summary("QUIT"), forStatement("REsEt").summary("RESET"), + forStatement("REsEt execution.runtime-type") + .summary("RESET execution.runtime-type"), forStatement(" SEt ").summary("SET"), forStatement("SET execution.runtime-type=batch") .summary("SET execution.runtime-type=batch"), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java index 3602a4b23066c..aa67021bb7334 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java @@ -18,9 +18,11 @@ package org.apache.flink.table.parse; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.command.ResetOperation; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** Strategy to parse statement to {@link ResetOperation}. */ @@ -29,12 +31,23 @@ public class ResetOperationParseStrategy extends AbstractRegexParseStrategy { static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy(); private ResetOperationParseStrategy() { - super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS)); + super(Pattern.compile("RESET(\\s+(?\\S+)\\s*)?", DEFAULT_PATTERN_FLAGS)); } @Override public Operation convert(String statement) { - return new ResetOperation(); + Matcher matcher = pattern.matcher(statement.trim()); + String key; + + if (matcher.find()) { + key = matcher.group("key"); + } else { + throw new TableException( + String.format( + "Failed to convert the statement to RESET operation: %s.", statement)); + } + + return new ResetOperation(key); } @Override