Skip to content

Commit

Permalink
[FLINK-16440][runtime] Introduce multiplication for ResourceProfile.
Browse files Browse the repository at this point in the history
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 27, 2020
1 parent 5330e9b commit e7e5e1d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,32 @@ public ResourceProfile subtract(final ResourceProfile other) {
);
}

@Nonnull
public ResourceProfile multiply(final int multiplier) {
checkArgument(multiplier >= 0, "multiplier must be >= 0");
if (equals(ANY)) {
return ANY;
}

if (this.equals(UNKNOWN)) {
return UNKNOWN;
}

Map<String, Resource> resultExtendedResource = new HashMap<>(extendedResources.size());
for (Map.Entry<String, Resource> entry : extendedResources.entrySet()) {
resultExtendedResource.put(entry.getKey(), entry.getValue().multiply(multiplier));
}

return new ResourceProfile(
cpuCores.multiply(multiplier),
taskHeapMemory.multiply(multiplier),
taskOffHeapMemory.multiply(multiplier),
managedMemory.multiply(multiplier),
networkMemory.multiply(multiplier),
resultExtendedResource
);
}

@Override
public String toString() {
if (this.equals(UNKNOWN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,39 @@ public void testSubtractWithInfValues() {
rp2.subtract(rp1);
}

@Test
public void testMultiply() {
final int by = 3;
final ResourceProfile rp1 = ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setNetworkMemoryMB(100)
.setManagedMemoryMB(100)
.addExtendedResource("gpu", new GPUResource(1.0))
.build();

ResourceProfile rp2 = rp1;
for (int i = 1; i < by; ++i) {
rp2 = rp2.merge(rp1);
}

assertEquals(rp2, rp1.multiply(by));
}

@Test(expected = IllegalArgumentException.class)
public void testMultiplyNegative() {
final ResourceProfile rp = ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setNetworkMemoryMB(100)
.setManagedMemoryMB(100)
.addExtendedResource("gpu", new GPUResource(1.0))
.build();
rp.multiply(-2);
}

@Test
public void testFromSpecWithSerializationCopy() throws Exception {
final ResourceSpec copiedSpec = CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
Expand Down

0 comments on commit e7e5e1d

Please sign in to comment.