From 13576c51f4644c875baafa86277a3ffaba88d1bd Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Mon, 2 Nov 2020 19:05:39 +0100 Subject: [PATCH] [hotfix][config] Add env.isUnalignedCheckpointsEnabled() (java/scala/python) The next commits adds a force option, so this method is needed for consistency. --- .../pyflink/datastream/stream_execution_environment.py | 6 ++++++ .../api/environment/StreamExecutionEnvironment.java | 8 ++++++++ .../streaming/api/scala/StreamExecutionEnvironment.scala | 5 +++++ 3 files changed, 19 insertions(+) diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index 0148be574aef6..0a2dc349f00db 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -771,3 +771,9 @@ def _generate_stream_graph(self, clear_transformations: bool = False, job_name: j_stream_graph.setJobName(job_name) return j_stream_graph + + def is_unaligned_checkpoints_enabled(self): + """ + Returns whether Unaligned Checkpoints are enabled. + """ + return self._j_stream_execution_environment.isUnalignedCheckpointsEnabled() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index c4824b51c9f79..9f09923b4724c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -524,6 +524,14 @@ public boolean isForceCheckpointing() { return checkpointCfg.isForceCheckpointing(); } + /** + * Returns whether Unaligned Checkpoints are enabled. + */ + @PublicEvolving + public boolean isUnalignedCheckpointsEnabled() { + return checkpointCfg.isUnalignedCheckpointsEnabled(); + } + /** * Returns the checkpointing mode (exactly-once vs. at-least-once). * diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 8c8bcd72a5655..39ef1e5a7a66a 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -881,6 +881,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = { javaEnv.registerCachedFile(filePath, name, executable) } + + /** + * Returns whether Unaligned Checkpoints are enabled. + */ + def isUnalignedCheckpointsEnabled: Boolean = javaEnv.isUnalignedCheckpointsEnabled } object StreamExecutionEnvironment {