Skip to content

Commit

Permalink
[FLINK-28298][table][python] Support left and right in Table API (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
a49a committed Jul 4, 2022
1 parent 7974e81 commit da720c3
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,10 @@ string:
- sql: INSTR(string1, string2)
description: Returns the position of the first occurrence of string2 in string1. Returns NULL if any of arguments is NULL.
- sql: LEFT(string, integer)
table: STRING.LEFT(INT)
description: Returns the leftmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL.
- sql: RIGHT(string, integer)
table: STRING.RIGHT(INT)
description: Returns the rightmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL.
- sql: LOCATE(string1, string2[, integer])
description: Returns the position of the first occurrence of string1 in string2 after position integer. Returns 0 if not found. Returns NULL if any of arguments is NULL.
Expand Down
2 changes: 2 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ string:
- sql: INSTR(string1, string2)
description: 返回 string2 在 string1 中第一次出现的位置。如果有任一参数为 `NULL`,则返回 `NULL`。
- sql: LEFT(string, integer)
table: STRING.LEFT(INT)
description: |
返回字符串中最左边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数
为 `NULL` 则返回 `NULL`。
- sql: RIGHT(string, integer)
table: STRING.RIGHT(INT)
description: |
返回字符串中最右边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数
为 `NULL` 则返回 `NULL`。
Expand Down
12 changes: 12 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,18 @@ def encode(self, charset: Union[str, 'Expression[str]']) -> 'Expression[bytes]':
"""
return _binary_op("encode")(self, charset)

def left(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Returns the leftmost integer characters from the input string.
"""
return _binary_op("left")(self, length)

def right(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Returns the rightmost integer characters from the input string.
"""
return _binary_op("right")(self, length)

@property
def ltrim(self) -> 'Expression[str]':
"""
Expand Down
2 changes: 2 additions & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ def test_expression(self):
self.assertEqual('chr(a)', str(expr1.chr))
self.assertEqual("decode(a, 'utf-8')", str(expr1.decode('utf-8')))
self.assertEqual("encode(a, 'utf-8')", str(expr1.encode('utf-8')))
self.assertEqual('left(a, 2)', str(expr1.left(2)))
self.assertEqual('right(a, 2)', str(expr1.right(2)))
self.assertEqual('ltrim(a)', str(expr1.ltrim))
self.assertEqual('rtrim(a)', str(expr1.rtrim))
self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_VALUE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LAST_VALUE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LEFT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LIKE;
Expand Down Expand Up @@ -135,6 +136,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RIGHT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROUND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROWTIME;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RPAD;
Expand Down Expand Up @@ -1048,6 +1050,16 @@ public OutType encode(InType charset) {
unresolvedCall(ENCODE, toExpr(), objectToExpression(charset)));
}

/** Returns the leftmost integer characters from the input string. */
public OutType left(InType len) {
return toApiSpecificExpression(unresolvedCall(LEFT, toExpr(), objectToExpression(len)));
}

/** Returns the rightmost integer characters from the input string. */
public OutType right(InType len) {
return toApiSpecificExpression(unresolvedCall(RIGHT, toExpr(), objectToExpression(len)));
}

/** Returns a string that removes the left whitespaces from the given string. */
public OutType ltrim() {
return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,28 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BYTES())))
.build();

public static final BuiltInFunctionDefinition LEFT =
BuiltInFunctionDefinition.newBuilder()
.name("left")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.INTEGER_NUMERIC)))
.outputTypeStrategy(nullableIfArgs(varyingString(argument(0))))
.build();

public static final BuiltInFunctionDefinition RIGHT =
BuiltInFunctionDefinition.newBuilder()
.name("right")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.INTEGER_NUMERIC)))
.outputTypeStrategy(nullableIfArgs(varyingString(argument(0))))
.build();

public static final BuiltInFunctionDefinition UUID =
BuiltInFunctionDefinition.newBuilder()
.name("uuid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public class DirectConvertRule implements CallExpressionConvertRule {
BuiltInFunctionDefinitions.DECODE, FlinkSqlOperatorTable.DECODE);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.ENCODE, FlinkSqlOperatorTable.ENCODE);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LEFT, FlinkSqlOperatorTable.LEFT);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RIGHT, FlinkSqlOperatorTable.RIGHT);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.UUID, FlinkSqlOperatorTable.UUID);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LTRIM, FlinkSqlOperatorTable.LTRIM);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RTRIM, FlinkSqlOperatorTable.RTRIM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
)
}

@Test
def testLeftAndRight(): Unit = {
val str = "Hello"
testAllApis(str.left(3), s"LEFT('$str', 3)", "Hel")
testAllApis(str.right(3), s"RIGHT('$str', 3)", "llo")
}

@Test
def testInstr(): Unit = {
testSqlApi("instr('Corporate Floor', 'or', 3, 2)", "14")
Expand Down

0 comments on commit da720c3

Please sign in to comment.