Skip to content

Commit

Permalink
[FLINK-14027][python][doc] Add documentation for Python User-Defined …
Browse files Browse the repository at this point in the history
…Scalar function.

This closes apache#9886.
  • Loading branch information
WeiZhong94 authored and hequn8128 committed Oct 20, 2019
1 parent 2b1187d commit fbb9414
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 73 deletions.
21 changes: 21 additions & 0 deletions docs/_includes/generated/python_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>python.fn-execution.bundle.size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>The maximum number of elements to include in a bundle for Python user-defined function execution. The elements are processed asynchronously. One bundle of elements are processed before processing the next bundle of elements. A larger value can improve the throughput, but at the cost of more memory usage and higher latency.</td>
</tr>
<tr>
<td><h5>python.fn-execution.bundle.time</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Sets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed. Lower timeouts lead to lower tail latencies, but may affect throughput.</td>
</tr>
</tbody>
</table>
124 changes: 90 additions & 34 deletions docs/dev/table/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ Scalar Functions

If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.

In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. Evaluation methods can also support variable arguments, such as `eval(String... strs)`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. Evaluation methods can also support variable arguments, such as `eval(String... strs)`.

The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class HashCode extends ScalarFunction {
private int factor = 12;
Expand All @@ -74,9 +74,29 @@ myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
{% endhighlight %}

By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.

The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.

{% highlight java %}
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}

public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. Evaluation methods can also support variable arguments, such as `@varargs def eval(str: String*)`.

The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:

{% highlight scala %}
// must be defined in static/object context
class HashCode(factor: Int) extends ScalarFunction {
Expand All @@ -95,14 +115,34 @@ myTable.select('string, hashCode('string))
tableEnv.registerFunction("hashCode", new HashCode(10))
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
{% endhighlight %}

By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.

The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.

{% highlight scala %}
object TimestampModifier extends ScalarFunction {
def eval(t: Long): Long = {
t % 1000
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
Types.TIMESTAMP
}
}
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
It supports to use both Java/Scala scalar functions and Python scalar functions in Python Table API and SQL. In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method. An evaluation method must be named `eval`. Evaluation method can also support variable arguments, such as `eval(*args)`.

The following example shows how to define your own Java and Python hash code functions, register them in the TableEnvironment, and call them in a query. Note that you can configure your scalar function via a constructor before it is registered:

{% highlight python %}
'''
Java code:

// The java class must have a public no-argument constructor and can be founded in current java classloader.
// The Java class must have a public no-argument constructor and can be founded in current Java classloader.
public class HashCode extends ScalarFunction {
private int factor = 12;

Expand All @@ -112,50 +152,64 @@ public class HashCode extends ScalarFunction {
}
'''

class PyHashCode(ScalarFunction):
def __init__(self):
self.factor = 12

def eval(self, s):
return hash(s) * self.factor

table_env = BatchTableEnvironment.create(env)

# register the java function
# register the Java function
table_env.register_java_function("hashCode", "my.java.function.HashCode")

# register the Python function
table_env.register_function("py_hash_code", udf(PyHashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))

# use the function in Python Table API
my_table.select("string, string.hashCode(), hashCode(string)")
my_table.select("string, bigint, string.hashCode(), hashCode(string), bigint.py_hash_code(), py_hash_code(bigint)")

# use the function in SQL API
table_env.sql_query("SELECT string, hashCode(string) FROM MyTable")
table_env.sql_query("SELECT string, bigint, hashCode(string), py_hash_code(bigint) FROM MyTable")
{% endhighlight %}
</div>
</div>

By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`. The following example shows the different ways to define a Python scalar function which takes two columns of bigint as input parameters and returns the sum of them as the result.

The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
{% highlight python %}
# option 1: extending the base class `ScalarFunction`
class Add(ScalarFunction):
def eval(self, i, j):
return i + j

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}
add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
}
{% endhighlight %}
</div>
# option 2: Python function
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j

<div data-lang="scala" markdown="1">
{% highlight scala %}
object TimestampModifier extends ScalarFunction {
def eval(t: Long): Long = {
t % 1000
}
# option 3: lambda function
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
Types.TIMESTAMP
}
}
# option 4: callable function
class CallableAdd(object):
def __call__(self, i, j):
return i + j

add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

# option 5: partial function
def partial_add(i, j, k):
return i + j + k

add = udf(functools.partial(partial_add, j=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT())

# register the Python function
table_env.register_function("add", add)
# use the function in Python Table API
my_table.select("add(a, b)")
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -272,6 +326,8 @@ table_env.register_java_function("split", "my.java.function.Split")
my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, word, length")

# Register the python function.

# Use the table function in SQL with LATERAL and TABLE keywords.
# CROSS JOIN a table function (equivalent to "join" in Table API).
table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
Expand Down
Loading

0 comments on commit fbb9414

Please sign in to comment.