diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 474b633b24d2a..510d77723a1c7 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -248,14 +248,10 @@ public void open(ReadableConfig config) throws Exception { Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions); - if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) { - Preconditions.checkArgument( - managedMemoryFraction > 0 && managedMemoryFraction <= 1.0, - String.format( - "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. " - + "It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\".", - managedMemoryFraction)); - + if (memoryManager != null + && config.get(USE_MANAGED_MEMORY) + && managedMemoryFraction > 0 + && managedMemoryFraction <= 1.0) { final LongFunctionWithException initializer = (size) -> new PythonSharedResources( @@ -274,6 +270,15 @@ public void open(ReadableConfig config) throws Exception { sharedResources.getResourceHandle().getEnvironment(); stageBundleFactory = createStageBundleFactory(jobBundleFactory, environment); } else { + if (memoryManager != null + && config.get(USE_MANAGED_MEMORY) + && (managedMemoryFraction <= 0 || managedMemoryFraction > 1.0)) { + LOG.warn( + String.format( + "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead." + + "Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.", + managedMemoryFraction)); + } // there is no way to access the MemoryManager for the batch job of old planner, // fallback to the way that spawning a Python process for each Python operator jobBundleFactory = createJobBundleFactory(pipelineOptions);