Skip to content

Commit

Permalink
[FLINK-23280][python] Fix the issue that Python ExplainDetails does n…
Browse files Browse the repository at this point in the history
…ot have JSON_EXECUTION_PLAN option

This closes apache#16407.
  • Loading branch information
SteNicholas authored and HuangXingBo committed Jul 7, 2021
1 parent 4bcdfa2 commit da7c685
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 3 deletions.
3 changes: 3 additions & 0 deletions flink-python/pyflink/table/explain_detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ class ExplainDetail(object):
# The changelog mode produced by a physical rel node.
# e.g. GroupAggregate(..., changelogMode=[I,UA,D])
CHANGELOG_MODE = 1

# The execution plan in json format of the program.
JSON_EXECUTION_PLAN = 2
9 changes: 9 additions & 0 deletions flink-python/pyflink/table/tests/test_explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import json

from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
from pyflink.table.explain_detail import ExplainDetail
Expand All @@ -29,6 +30,14 @@ def test_explain(self):

assert isinstance(result, str)

result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain(
ExplainDetail.JSON_EXECUTION_PLAN)
assert isinstance(result, str)
try:
json.loads(result.split('== Physical Execution Plan ==')[1])
except:
self.fail('The execution plan of explain detail is not in json format.')


if __name__ == '__main__':
import unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_explain_with_extended(self):
t = t_env.from_elements([], schema)
result = t.select(t.a + 1, t.b, t.c)

actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE,
ExplainDetail.JSON_EXECUTION_PLAN)

assert isinstance(actual, str)

Expand Down Expand Up @@ -347,7 +348,8 @@ def test_explain_with_multi_sinks(self):
stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)

actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE,
ExplainDetail.JSON_EXECUTION_PLAN)
self.assertIsInstance(actual, str)

def test_register_java_function(self):
Expand Down
4 changes: 3 additions & 1 deletion flink-python/pyflink/util/java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def to_j_explain_detail_arr(p_extra_details):
gateway = get_gateway()

def to_j_explain_detail(p_extra_detail):
if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
if p_extra_detail == ExplainDetail.JSON_EXECUTION_PLAN:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
else:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
Expand Down

0 comments on commit da7c685

Please sign in to comment.