Skip to content

Commit

Permalink
[FLINK-15375][core] Add factory method for creating MemorySize from m…
Browse files Browse the repository at this point in the history
…ebibytes.

This closes apache#10785.
  • Loading branch information
xintongsong authored and tillrohrmann committed Jan 21, 2020
1 parent 173f2f8 commit 4f55895
Show file tree
Hide file tree
Showing 13 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private Object readResolve() {
// ------------------------------------------------------------------------

public static Builder newBuilder(double cpuCores, int taskHeapMemoryMB) {
return new Builder(new CPUResource(cpuCores), MemorySize.parse(taskHeapMemoryMB + "m"));
return new Builder(new CPUResource(cpuCores), MemorySize.ofMebiBytes(taskHeapMemoryMB));
}

/**
Expand Down Expand Up @@ -334,7 +334,7 @@ public Builder setTaskHeapMemory(MemorySize taskHeapMemory) {
}

public Builder setTaskHeapMemoryMB(int taskHeapMemoryMB) {
this.taskHeapMemory = MemorySize.parse(taskHeapMemoryMB + "m");
this.taskHeapMemory = MemorySize.ofMebiBytes(taskHeapMemoryMB);
return this;
}

Expand All @@ -344,7 +344,7 @@ public Builder setTaskOffHeapMemory(MemorySize taskOffHeapMemory) {
}

public Builder setOffTaskHeapMemoryMB(int taskOffHeapMemoryMB) {
this.taskOffHeapMemory = MemorySize.parse(taskOffHeapMemoryMB + "m");
this.taskOffHeapMemory = MemorySize.ofMebiBytes(taskOffHeapMemoryMB);
return this;
}

Expand All @@ -354,7 +354,7 @@ public Builder setManagedMemory(MemorySize managedMemory) {
}

public Builder setManagedMemoryMB(int managedMemoryMB) {
this.managedMemory = MemorySize.parse(managedMemoryMB + "m");
this.managedMemory = MemorySize.ofMebiBytes(managedMemoryMB);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
return MemorySize.ofMebiBytes(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB));
} else {
//use default value
return MemorySize.parse(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public MemorySize(long bytes) {
this.bytes = bytes;
}

public static MemorySize ofMebiBytes(long mebiBytes) {
return new MemorySize(mebiBytes << 20);
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testCommandLineClusterSpecification() throws Exception {
final int slotsPerTaskManager = 30;

configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);

final String[] args = {
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testConfigurationClusterSpecification() throws Exception {
final int jobManagerMemory = 1337;
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
final int taskManagerMemory = 7331;
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
final int slotsPerTaskManager = 42;
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);

Expand Down Expand Up @@ -257,7 +257,7 @@ private ClusterClientFactory<String> getClusterClientFactory(final Configuration

private KubernetesSessionCli createFlinkKubernetesCustomCliWithTmTotalMemory(int totalMemory) {
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(totalMemory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
return new KubernetesSessionCli(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private static double getCpuCores(final Configuration configuration) {
}

private static MemorySize getTotalProcessMemory(final Configuration configuration) {
MemorySize legacyTotalProcessMemory = MemorySize.parse(configuration.getInteger(MESOS_RM_TASKS_MEMORY_MB) + "m");
MemorySize legacyTotalProcessMemory = MemorySize.ofMebiBytes(configuration.getInteger(MESOS_RM_TASKS_MEMORY_MB));
MemorySize unifiedTotalProcessMemory = configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);

if (configuration.contains(MESOS_RM_TASKS_MEMORY_MB) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
public class MesosTaskManagerParametersTest extends TestLogger {
private static final int TOTAL_PROCESS_MEMORY_MB = 1280;
private static final MemorySize TOTAL_PROCESS_MEMORY_SIZE = MemorySize.parse(TOTAL_PROCESS_MEMORY_MB + "m");
private static final MemorySize TOTAL_PROCESS_MEMORY_SIZE = MemorySize.ofMebiBytes(TOTAL_PROCESS_MEMORY_MB);

@Test
public void testBuildVolumes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public Builder setTaskHeapMemory(MemorySize taskHeapMemory) {
}

public Builder setTaskHeapMemoryMB(int taskHeapMemoryMB) {
this.taskHeapMemory = MemorySize.parse(taskHeapMemoryMB + "m");
this.taskHeapMemory = MemorySize.ofMebiBytes(taskHeapMemoryMB);
return this;
}

Expand All @@ -496,7 +496,7 @@ public Builder setTaskOffHeapMemory(MemorySize taskOffHeapMemory) {
}

public Builder setTaskOffHeapMemoryMB(int taskOffHeapMemoryMB) {
this.taskOffHeapMemory = MemorySize.parse(taskOffHeapMemoryMB + "m");
this.taskOffHeapMemory = MemorySize.ofMebiBytes(taskOffHeapMemoryMB);
return this;
}

Expand All @@ -506,7 +506,7 @@ public Builder setManagedMemory(MemorySize managedMemory) {
}

public Builder setManagedMemoryMB(int managedMemoryMB) {
this.managedMemory = MemorySize.parse(managedMemoryMB + "m");
this.managedMemory = MemorySize.ofMebiBytes(managedMemoryMB);
return this;
}

Expand All @@ -516,7 +516,7 @@ public Builder setNetworkMemory(MemorySize networkMemory) {
}

public Builder setNetworkMemoryMB(int networkMemoryMB) {
this.networkMemory = MemorySize.parse(networkMemoryMB + "m");
this.networkMemory = MemorySize.ofMebiBytes(networkMemoryMB);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testConsistencyCheckOfDerivedNetworkMemoryDoesNotMatchLegacyConfigFa
// derive network memory which is bigger than the number of legacy network buffers
final int networkMemorySizeMbToDerive = pageSizeMb * (numberOfNetworkBuffers + 1);
final Configuration configuration = setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(networkMemorySizeMbToDerive);
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(pageSizeMb + "m"));
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.ofMebiBytes(pageSizeMb));
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, numberOfNetworkBuffers);
// internal validation should fail
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
Expand All @@ -251,12 +251,12 @@ private static Configuration setupConfigWithFlinkAndTaskHeapToDeriveGivenNetwork
// adjust total Flink memory size to accommodate for more network memory
final int adjustedTotalFlinkMemoryMb = taskExecutorResourceSpec.getTotalFlinkMemorySize().getMebiBytes() -
derivedNetworkMemorySizeMb + networkMemorySizeToDeriveMb;
conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse(adjustedTotalFlinkMemoryMb + "m"));
conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.ofMebiBytes(adjustedTotalFlinkMemoryMb));
} else if (derivedNetworkMemorySizeMb > networkMemorySizeToDeriveMb) {
// reduce derived network memory by increasing task heap size
final int adjustedTaskHeapMemoryMb = taskExecutorResourceSpec.getTaskHeapSize().getMebiBytes() +
derivedNetworkMemorySizeMb - networkMemorySizeToDeriveMb;
conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse(adjustedTaskHeapMemoryMb + "m"));
conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(adjustedTaskHeapMemoryMb));
}

final TaskExecutorResourceSpec adjusteedTaskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ public void testReleaseFail() {
}

private static ResourceProfile createResourceProfile(double cpus, int memory) {
return ResourceProfile.newBuilder().setCpuCores(cpus).setTaskHeapMemory(MemorySize.parse(memory + "m")).build();
return ResourceProfile.newBuilder().setCpuCores(cpus).setTaskHeapMemory(MemorySize.ofMebiBytes(memory)).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testEquals() {
ResourceSpec rs5 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
MemorySize networkMemory = MemorySize.parse(100 + "m");
MemorySize networkMemory = MemorySize.ofMebiBytes(100);
assertEquals(ResourceProfile.fromResourceSpec(rs3, networkMemory), ResourceProfile.fromResourceSpec(rs5, networkMemory));

final ResourceProfile rp1 = ResourceProfile.newBuilder()
Expand Down Expand Up @@ -199,7 +199,7 @@ public void testGet() {
ResourceSpec rs = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.6).
build();
ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, MemorySize.parse(50 + "m"));
ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, MemorySize.ofMebiBytes(50));

assertEquals(new CPUResource(1.0), rp.getCpuCores());
assertEquals(150, rp.getTotalMemory().getMebiBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private void checkRootDirsClean(File[] rootDirs) {

private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(
Configuration config) throws IOException {
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse(TOTAL_FLINK_MEMORY_MB + "m"));
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.ofMebiBytes(TOTAL_FLINK_MEMORY_MB));
TaskExecutorResourceSpec spec = TaskExecutorResourceUtils.resourceSpecFromConfig(config);
return TaskManagerServicesConfiguration.fromConfiguration(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testStartupWhenNetworkStackFailsToInitialize() throws Exception {

private static Configuration createFlinkConfiguration() {
final Configuration config = new Configuration();
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse(TOTAL_FLINK_MEMORY_MB + "m"));
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.ofMebiBytes(TOTAL_FLINK_MEMORY_MB));

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testCommandLineClusterSpecification() throws Exception {
final int slotsPerTaskManager = 30;

configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);

final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)};
Expand All @@ -300,7 +300,7 @@ public void testConfigurationClusterSpecification() throws Exception {
final int jobManagerMemory = 1337;
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
final int taskManagerMemory = 7331;
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
final int slotsPerTaskManager = 42;
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);

Expand Down Expand Up @@ -450,7 +450,7 @@ private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {

private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException {
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(totalMemomory + "m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemomory));
return createFlinkYarnSessionCli(configuration);
}

Expand Down

0 comments on commit 4f55895

Please sign in to comment.