Skip to content

Commit

Permalink
[FLINK-24541][runtime] Quoting the external resource list in generati…
Browse files Browse the repository at this point in the history
…ng dynamic config string

This closes apache#17533
  • Loading branch information
KarmaGYZ authored and xintongsong committed Oct 27, 2021
1 parent d50d9b3 commit 44e75ec
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ public static String generateDynamicConfigsStr(
if (!taskExecutorProcessSpec.getExtendedResources().isEmpty()) {
configs.put(
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(),
String.join(";", taskExecutorProcessSpec.getExtendedResources().keySet()));
'"'
+ String.join(
";", taskExecutorProcessSpec.getExtendedResources().keySet())
+ '"');
taskExecutorProcessSpec
.getExtendedResources()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.junit.Test;

import java.util.Collections;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Consumer;

Expand All @@ -55,7 +55,8 @@ public class TaskExecutorProcessUtilsTest
private static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
private static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("1280m");
private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m");
private static final String EXTERNAL_RESOURCE_NAME = "gpu";
private static final String EXTERNAL_RESOURCE_NAME_1 = "gpu";
private static final String EXTERNAL_RESOURCE_NAME_2 = "custom";

private static final TaskExecutorProcessSpec TM_RESOURCE_SPEC =
new TaskExecutorProcessSpec(
Expand All @@ -68,7 +69,9 @@ public class TaskExecutorProcessUtilsTest
MemorySize.parse("6m"),
MemorySize.parse("7m"),
MemorySize.parse("8m"),
Collections.singleton(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1)));
Arrays.asList(
new ExternalResource(EXTERNAL_RESOURCE_NAME_1, 1),
new ExternalResource(EXTERNAL_RESOURCE_NAME_2, 2)));

public TaskExecutorProcessUtilsTest() {
super(
Expand Down Expand Up @@ -122,16 +125,27 @@ public void testGenerateDynamicConfigurations() {
is(TM_RESOURCE_SPEC.getNumSlots()));
assertThat(
configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()),
is(EXTERNAL_RESOURCE_NAME));
is('"' + String.join(";", TM_RESOURCE_SPEC.getExtendedResources().keySet()) + '"'));
assertThat(
configs.get(
ExternalResourceOptions.getAmountConfigOptionForResource(
EXTERNAL_RESOURCE_NAME)),
EXTERNAL_RESOURCE_NAME_1)),
is(
String.valueOf(
TM_RESOURCE_SPEC
.getExtendedResources()
.get(EXTERNAL_RESOURCE_NAME)
.get(EXTERNAL_RESOURCE_NAME_1)
.getValue()
.longValue())));
assertThat(
configs.get(
ExternalResourceOptions.getAmountConfigOptionForResource(
EXTERNAL_RESOURCE_NAME_2)),
is(
String.valueOf(
TM_RESOURCE_SPEC
.getExtendedResources()
.get(EXTERNAL_RESOURCE_NAME_2)
.getValue()
.longValue())));
}
Expand All @@ -146,7 +160,7 @@ public void testProcessSpecFromWorkerResourceSpec() {
.setNetworkMemoryMB(300)
.setManagedMemoryMB(400)
.setNumSlots(5)
.setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1))
.setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME_1, 1))
.build();
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
Expand Down Expand Up @@ -642,9 +656,9 @@ public void testConfigNumSlots() {
public void testProcessSpecFromConfigWithExternalResource() {
final Configuration config = new Configuration();
config.setString(
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME);
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME_1);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME),
ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME_1),
1);
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096));
final TaskExecutorProcessSpec taskExecutorProcessSpec =
Expand All @@ -653,7 +667,7 @@ public void testProcessSpecFromConfigWithExternalResource() {
assertThat(
taskExecutorProcessSpec
.getExtendedResources()
.get(EXTERNAL_RESOURCE_NAME)
.get(EXTERNAL_RESOURCE_NAME_1)
.getValue()
.longValue(),
is(1L));
Expand Down

0 comments on commit 44e75ec

Please sign in to comment.