Skip to content

Commit

Permalink
[hotfix][python] Use off-heap memory if managed memory fraction is 0
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Dec 14, 2022
1 parent 4abdf2d commit 9cc55d6
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,10 @@ public void open(ReadableConfig config) throws Exception {

Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);

if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) {
Preconditions.checkArgument(
managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
String.format(
"The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. "
+ "It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\".",
managedMemoryFraction));

if (memoryManager != null
&& config.get(USE_MANAGED_MEMORY)
&& managedMemoryFraction > 0
&& managedMemoryFraction <= 1.0) {
final LongFunctionWithException<PythonSharedResources, Exception> initializer =
(size) ->
new PythonSharedResources(
Expand All @@ -274,6 +270,15 @@ public void open(ReadableConfig config) throws Exception {
sharedResources.getResourceHandle().getEnvironment();
stageBundleFactory = createStageBundleFactory(jobBundleFactory, environment);
} else {
if (memoryManager != null
&& config.get(USE_MANAGED_MEMORY)
&& (managedMemoryFraction <= 0 || managedMemoryFraction > 1.0)) {
LOG.warn(
String.format(
"The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead."
+ "Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.",
managedMemoryFraction));
}
// there is no way to access the MemoryManager for the batch job of old planner,
// fallback to the way that spawning a Python process for each Python operator
jobBundleFactory = createJobBundleFactory(pipelineOptions);
Expand Down

0 comments on commit 9cc55d6

Please sign in to comment.