Skip to content

Commit

Permalink
[FLINK-14405][runtime] Update ResourceSpec to align with FLIP-49 reso…
Browse files Browse the repository at this point in the history
…urce types
  • Loading branch information
xintongsong authored and azagrebin committed Nov 12, 2019
1 parent 1cbee5e commit 2c81c3f
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 199 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,42 @@ public class ResourceSpecTest extends TestLogger {

@Test
public void testIsValid() throws Exception {
ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(-1).
build();
assertFalse(rs.isValid());
}

@Test
public void testLessThanOrEqual() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertTrue(rs1.lessThanOrEqual(rs2));
assertTrue(rs2.lessThanOrEqual(rs1));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
assertTrue(rs1.lessThanOrEqual(rs3));
assertFalse(rs3.lessThanOrEqual(rs1));

ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertFalse(rs4.lessThanOrEqual(rs3));
Expand All @@ -80,52 +80,52 @@ public void testLessThanOrEqual() throws Exception {

@Test
public void testEquals() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(rs1, rs2);
assertEquals(rs2, rs1);

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertNotEquals(rs3, rs4);

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(rs3, rs5);
}

@Test
public void testHashCode() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(rs1.hashCode(), rs2.hashCode());

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertNotEquals(rs3.hashCode(), rs4.hashCode());

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(rs3.hashCode(), rs5.hashCode());
Expand All @@ -135,14 +135,14 @@ public void testHashCode() throws Exception {
public void testMerge() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();

ResourceSpec rs3 = rs1.merge(rs2);
assertEquals(2.0, rs3.getCpuCores(), 0.000001);
assertEquals(200, rs3.getHeapMemory());
assertEquals(200, rs3.getTaskHeapMemory().getMebiBytes());
assertEquals(1.1, rs3.getGPUResource(), 0.000001);

ResourceSpec rs4 = rs1.merge(rs3);
Expand All @@ -153,7 +153,7 @@ public void testMerge() throws Exception {
public void testSerializable() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();

Expand All @@ -166,7 +166,7 @@ public void testMergeThisUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.UNKNOWN;
final ResourceSpec spec2 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(100)
.setTaskHeapMemoryMB(100)
.setGPUResource(1.1)
.build();

Expand All @@ -179,7 +179,7 @@ public void testMergeThisUnknown() throws Exception {
public void testMergeOtherUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(100)
.setTaskHeapMemoryMB(100)
.setGPUResource(1.1)
.build();
final ResourceSpec spec2 = ResourceSpec.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public void testConfigurationOfResource() throws Exception{
opMethod.setAccessible(true);

// verify explicit change in resources
ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
opMethod.invoke(operator, minResources, preferredResources);

assertEquals(minResources, operator.getMinResources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public class JobGraphGeneratorTest {
*/
@Test
public void testResourcesForChainedOperators() throws Exception {
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setHeapMemoryInMB(700).build();
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setTaskHeapMemoryMB(700).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down Expand Up @@ -147,12 +147,12 @@ public boolean filter(Long value) throws Exception {
*/
@Test
public void testResourcesForDeltaIteration() throws Exception{
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class ResourceProfileTest {

@Test
public void testMatchRequirement() throws Exception {
public void testMatchRequirement() {
ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, 0, Collections.emptyMap());
ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, 0, Collections.emptyMap());
ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, 0, Collections.emptyMap());
Expand All @@ -64,12 +64,12 @@ public void testMatchRequirement() throws Exception {

ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs2 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();

Expand All @@ -84,26 +84,26 @@ public void testUnknownMatchesUnknown() {
}

@Test
public void testEquals() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
public void testEquals() {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(ResourceProfile.fromResourceSpec(rs1, 0), ResourceProfile.fromResourceSpec(rs2, 0));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
assertNotEquals(ResourceProfile.fromResourceSpec(rs3, 0), ResourceProfile.fromResourceSpec(rs4, 0));

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(ResourceProfile.fromResourceSpec(rs3, 100), ResourceProfile.fromResourceSpec(rs5, 100));
Expand All @@ -127,40 +127,40 @@ public void testEquals() throws Exception {
}

@Test
public void testCompareTo() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
public void testCompareTo() {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(0, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs2, 0)));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(-1, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs1, 0)));

ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0)));
assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(0, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs5, 0)));
}

@Test
public void testGet() throws Exception {
public void testGet() {
ResourceSpec rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.6).
build();
ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 50);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception {

DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

ResourceSpec resourceSpec = ResourceSpec.newBuilder().setCpuCores(2).build();
ResourceSpec resourceSpec = ResourceSpec.newBuilder().setCpuCores(2).setTaskHeapMemoryMB(0).build();

final JobVertex firstVertex = new JobVertex("firstVertex");
firstVertex.setInvokableClass(NoOpInvokable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,26 +547,26 @@ public void invoke(Long value) throws Exception {
public void testResources() throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ResourceSpec minResource1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec preferredResource1 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
ResourceSpec minResource1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec preferredResource1 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(200).build();

ResourceSpec minResource2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(200).build();
ResourceSpec preferredResource2 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(300).build();
ResourceSpec minResource2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(200).build();
ResourceSpec preferredResource2 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(300).build();

ResourceSpec minResource3 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(300).build();
ResourceSpec preferredResource3 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(400).build();
ResourceSpec minResource3 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(300).build();
ResourceSpec preferredResource3 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(400).build();

ResourceSpec minResource4 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(400).build();
ResourceSpec preferredResource4 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(500).build();
ResourceSpec minResource4 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(400).build();
ResourceSpec preferredResource4 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(500).build();

ResourceSpec minResource5 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(500).build();
ResourceSpec preferredResource5 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(600).build();
ResourceSpec minResource5 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(500).build();
ResourceSpec preferredResource5 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(600).build();

ResourceSpec minResource6 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(600).build();
ResourceSpec preferredResource6 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(700).build();
ResourceSpec minResource6 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(600).build();
ResourceSpec preferredResource6 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(700).build();

ResourceSpec minResource7 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(700).build();
ResourceSpec preferredResource7 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(800).build();
ResourceSpec minResource7 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(700).build();
ResourceSpec preferredResource7 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(800).build();

Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down
Loading

0 comments on commit 2c81c3f

Please sign in to comment.