-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33387][runtime] Introduce the abstraction and the interface about loading #25174
base: master
Are you sure you want to change the base?
Conversation
Hi @1996fanrui could you help take a look if you had the free time ? thx a lot~ |
@@ -59,12 +59,7 @@ public void disableBatchSlotRequestTimeoutCheck() { | |||
public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @1996fanrui
I'm trying to find a way to implement both the waiting and matching mechanisms of Slots in this class when the balanced scheduling feature is turned on. To avoid the previousLoading
implementation in PhysicalSlot
.
Unfortunately, in order to solve the failover scenario, all slot requests during the failover request can be matched against globally sufficient and free slots. This requires the PhysicalSlotProvider to be able to know if all current slot requests have arrived.
Introducing a slot request wait mechanism for the PhysicalSlotProvider's implementation class would solve this problem (as expected), but it would seem to duplicate the logic of the DeclarativeSlotPool's slot-request max interval mechanism. WDYT ? Many thx!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to merge PhysicalSlotProvider
and DeclarativeSlotPool
into one class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui Sounds great~
I tried to make some analyzing about it.
From the existing structure, it seems more appropriate to merge PhysicalSlotProvider
and SlotPool
together?
In my limited read, there seem to be a few of other issues as the red-lines.
Could you help give some related ideas about it ? Many thanks!
|
||
/** The class is used to represent the loading weight abstraction. */ | ||
@Internal | ||
public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does LoadingWeight need to extend Comparable<LoadingWeight>
?
Do we sort them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, in TasksBalancedRequestSlotMatchingStrategy.
I tried to accelerate the matching by sorting.
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.scheduler.loading; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are Loading
related interfaces placed in the scheduler module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original intention is that the abstraction related is belonging to the category of scheduling.
Do you have any ideas about it ? thanks~
/** | ||
* Set the loading. | ||
* | ||
* @param loadingWeight loading weight to set. | ||
*/ | ||
void setLoading(LoadingWeight loadingWeight); | ||
|
||
/** Reset the loading. */ | ||
void resetLoading(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these 2 methods should be placed in the SlotInfo interface.
I checked all callers of these 2 methods, all of them call the AllocatedSlot
instead of SlotInfo
. So adding these 2 method into AllocatedSlot
class is enough, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the block (org.apache.flink.runtime.jobmaster.slotpool.PendingRequest) by PhysicalSlot
boolean fulfill(PhysicalSlot slot) {
Preconditions.checkState(
slot.getLoading().equals(getLoading()),
"Unexpected loading weight, it may be a bug.");
return slotFuture.complete(slot);
}
So, what about moving the method declaration into PhysicalSlot
?
this.allocationId = checkNotNull(allocationId); | ||
this.taskManagerLocation = checkNotNull(location); | ||
this.physicalSlotNumber = physicalSlotNumber; | ||
this.resourceProfile = checkNotNull(resourceProfile); | ||
this.taskManagerGateway = checkNotNull(taskManagerGateway); | ||
this.loadingWeight = Preconditions.checkNotNull(loadingWeight); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.loadingWeight = Preconditions.checkNotNull(loadingWeight); | |
this.loadingWeight = checkNotNull(loadingWeight); |
nit: keep same style.
* | ||
* @return The previous loadable resource profile of the current slot. | ||
*/ | ||
Optional<LoadableResourceProfile> getPreviousLoadableResourceProfile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tmp comment: why need this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for extracting trivial lines as a method, which is convenient to call in the related points in the previous implementation
@@ -59,12 +59,7 @@ public void disableBatchSlotRequestTimeoutCheck() { | |||
public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to merge PhysicalSlotProvider
and DeclarativeSlotPool
into one class?
…for Default Scheduler
964f960
to
07bf7e5
Compare
07bf7e5
to
2a34f65
Compare
What is the purpose of the change
Introduce the abstraction and the interface about loading
Support tasks balancing at TaskExecutor level for Default Scheduler without public users-oriented interface for enabling the feature.
Brief change log
[FLINK-33388][runtime] Support tasks balancing at TaskExecutor level for Default Scheduler
[FLINK-33387][runtime] Introduce the abstraction and the interface about loading
Verifying this change
Added the corresponding test cases about core logic.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation