Skip to content

Commit

Permalink
[FLINK-21463][table-api] Support to parse 'RESET key' command
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 authored and wuchong committed Mar 27, 2021
1 parent 5b33c79 commit 1080a63
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,32 @@

import org.apache.flink.table.operations.Operation;

/** Operation that represent RESET command. */
import javax.annotation.Nullable;

import java.util.Optional;

/**
* Operation to represent RESET command. If {@link #getKey()} is empty, it means reset the session
* configuration.
*/
public class ResetOperation implements Operation {

@Nullable private final String key;

public ResetOperation(@Nullable String key) {
this.key = key;
}

public Optional<String> getKey() {
return Optional.ofNullable(key);
}

@Override
public String asSummaryString() {
return "RESET";
if (key == null) {
return "RESET";
} else {
return String.format("RESET %s", key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.table.planner.parse;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.ResetOperation;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Strategy to parse statement to {@link ResetOperation}. */
Expand All @@ -29,12 +31,23 @@ public class ResetOperationParseStrategy extends AbstractRegexParseStrategy {
static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy();

private ResetOperationParseStrategy() {
super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS));
super(Pattern.compile("RESET(\\s+(?<key>\\S+)\\s*)?", DEFAULT_PATTERN_FLAGS));
}

@Override
public Operation convert(String statement) {
return new ResetOperation();
Matcher matcher = pattern.matcher(statement.trim());
String key;

if (matcher.find()) {
key = matcher.group("key");
} else {
throw new TableException(
String.format(
"Failed to convert the statement to RESET operation: %s.", statement));
}

return new ResetOperation(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ParserImplTest {
forStatement("qUIT").summary("QUIT"),
forStatement("ExIT").summary("QUIT"),
forStatement("REsEt").summary("RESET"),
forStatement("REsEt execution.runtime-type")
.summary("RESET execution.runtime-type"),
forStatement(" SEt ").summary("SET"),
forStatement("SET execution.runtime-type=batch")
.summary("SET execution.runtime-type=batch"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.table.parse;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.ResetOperation;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Strategy to parse statement to {@link ResetOperation}. */
Expand All @@ -29,12 +31,23 @@ public class ResetOperationParseStrategy extends AbstractRegexParseStrategy {
static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy();

private ResetOperationParseStrategy() {
super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS));
super(Pattern.compile("RESET(\\s+(?<key>\\S+)\\s*)?", DEFAULT_PATTERN_FLAGS));
}

@Override
public Operation convert(String statement) {
return new ResetOperation();
Matcher matcher = pattern.matcher(statement.trim());
String key;

if (matcher.find()) {
key = matcher.group("key");
} else {
throw new TableException(
String.format(
"Failed to convert the statement to RESET operation: %s.", statement));
}

return new ResetOperation(key);
}

@Override
Expand Down

0 comments on commit 1080a63

Please sign in to comment.