From 44e75ecdf9ca15f8a028f77efd6cc7da5a7e4549 Mon Sep 17 00:00:00 2001 From: Yangze Guo Date: Thu, 21 Oct 2021 15:46:33 +0800 Subject: [PATCH] [FLINK-24541][runtime] Quoting the external resource list in generating dynamic config string This closes #17533 --- .../TaskExecutorProcessUtils.java | 5 ++- .../TaskExecutorProcessUtilsTest.java | 34 +++++++++++++------ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java index afe8f8bd1537d..6fb0321f2cb5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java @@ -128,7 +128,10 @@ public static String generateDynamicConfigsStr( if (!taskExecutorProcessSpec.getExtendedResources().isEmpty()) { configs.put( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), - String.join(";", taskExecutorProcessSpec.getExtendedResources().keySet())); + '"' + + String.join( + ";", taskExecutorProcessSpec.getExtendedResources().keySet()) + + '"'); taskExecutorProcessSpec .getExtendedResources() .forEach( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java index 5a751cf91e62c..3ea74f6a47f8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java @@ -33,7 +33,7 @@ import org.junit.Test; -import java.util.Collections; +import java.util.Arrays; import java.util.Map; import java.util.function.Consumer; @@ -55,7 +55,8 @@ public class TaskExecutorProcessUtilsTest private static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m"); private static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("1280m"); private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m"); - private static final String EXTERNAL_RESOURCE_NAME = "gpu"; + private static final String EXTERNAL_RESOURCE_NAME_1 = "gpu"; + private static final String EXTERNAL_RESOURCE_NAME_2 = "custom"; private static final TaskExecutorProcessSpec TM_RESOURCE_SPEC = new TaskExecutorProcessSpec( @@ -68,7 +69,9 @@ public class TaskExecutorProcessUtilsTest MemorySize.parse("6m"), MemorySize.parse("7m"), MemorySize.parse("8m"), - Collections.singleton(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1))); + Arrays.asList( + new ExternalResource(EXTERNAL_RESOURCE_NAME_1, 1), + new ExternalResource(EXTERNAL_RESOURCE_NAME_2, 2))); public TaskExecutorProcessUtilsTest() { super( @@ -122,16 +125,27 @@ public void testGenerateDynamicConfigurations() { is(TM_RESOURCE_SPEC.getNumSlots())); assertThat( configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()), - is(EXTERNAL_RESOURCE_NAME)); + is('"' + String.join(";", TM_RESOURCE_SPEC.getExtendedResources().keySet()) + '"')); assertThat( configs.get( ExternalResourceOptions.getAmountConfigOptionForResource( - EXTERNAL_RESOURCE_NAME)), + EXTERNAL_RESOURCE_NAME_1)), is( String.valueOf( TM_RESOURCE_SPEC .getExtendedResources() - .get(EXTERNAL_RESOURCE_NAME) + .get(EXTERNAL_RESOURCE_NAME_1) + .getValue() + .longValue()))); + assertThat( + configs.get( + ExternalResourceOptions.getAmountConfigOptionForResource( + EXTERNAL_RESOURCE_NAME_2)), + is( + String.valueOf( + TM_RESOURCE_SPEC + .getExtendedResources() + .get(EXTERNAL_RESOURCE_NAME_2) .getValue() .longValue()))); } @@ -146,7 +160,7 @@ public void testProcessSpecFromWorkerResourceSpec() { .setNetworkMemoryMB(300) .setManagedMemoryMB(400) .setNumSlots(5) - .setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1)) + .setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME_1, 1)) .build(); final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec( @@ -642,9 +656,9 @@ public void testConfigNumSlots() { public void testProcessSpecFromConfigWithExternalResource() { final Configuration config = new Configuration(); config.setString( - ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME); + ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME_1); config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME), + ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME_1), 1); config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096)); final TaskExecutorProcessSpec taskExecutorProcessSpec = @@ -653,7 +667,7 @@ public void testProcessSpecFromConfigWithExternalResource() { assertThat( taskExecutorProcessSpec .getExtendedResources() - .get(EXTERNAL_RESOURCE_NAME) + .get(EXTERNAL_RESOURCE_NAME_1) .getValue() .longValue(), is(1L));