Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33387][runtime] Introduce the abstraction and the interface about loading #25174

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

RocMarshal
Copy link
Contributor

What is the purpose of the change

Introduce the abstraction and the interface about loading
Support tasks balancing at TaskExecutor level for Default Scheduler without public users-oriented interface for enabling the feature.

Brief change log

[FLINK-33388][runtime] Support tasks balancing at TaskExecutor level for Default Scheduler
[FLINK-33387][runtime] Introduce the abstraction and the interface about loading

Verifying this change

Added the corresponding test cases about core logic.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented yet, will do it.)

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 8, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui self-assigned this Aug 8, 2024
@RocMarshal RocMarshal marked this pull request as ready for review August 9, 2024 08:10
@RocMarshal
Copy link
Contributor Author

Hi @1996fanrui could you help take a look if you had the free time ? thx a lot~

@@ -59,12 +59,7 @@ public void disableBatchSlotRequestTimeoutCheck() {
public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
Copy link
Contributor Author

@RocMarshal RocMarshal Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @1996fanrui
I'm trying to find a way to implement both the waiting and matching mechanisms of Slots in this class when the balanced scheduling feature is turned on. To avoid the previousLoading implementation in PhysicalSlot.

Unfortunately, in order to solve the failover scenario, all slot requests during the failover request can be matched against globally sufficient and free slots. This requires the PhysicalSlotProvider to be able to know if all current slot requests have arrived.

Introducing a slot request wait mechanism for the PhysicalSlotProvider's implementation class would solve this problem (as expected), but it would seem to duplicate the logic of the DeclarativeSlotPool's slot-request max interval mechanism. WDYT ? Many thx!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to merge PhysicalSlotProvider and DeclarativeSlotPool into one class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Sounds great~
I tried to make some analyzing about it.
image

From the existing structure, it seems more appropriate to merge PhysicalSlotProvider and SlotPool together?

In my limited read, there seem to be a few of other issues as the red-lines.
Could you help give some related ideas about it ? Many thanks!


/** The class is used to represent the loading weight abstraction. */
@Internal
public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does LoadingWeight need to extend Comparable<LoadingWeight>?

Do we sort them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in TasksBalancedRequestSlotMatchingStrategy.
I tried to accelerate the matching by sorting.

* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.loading;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are Loading related interfaces placed in the scheduler module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original intention is that the abstraction related is belonging to the category of scheduling.
Do you have any ideas about it ? thanks~

Comment on lines +62 to +70
/**
* Set the loading.
*
* @param loadingWeight loading weight to set.
*/
void setLoading(LoadingWeight loadingWeight);

/** Reset the loading. */
void resetLoading();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these 2 methods should be placed in the SlotInfo interface.

I checked all callers of these 2 methods, all of them call the AllocatedSlot instead of SlotInfo. So adding these 2 method into AllocatedSlot class is enough, right?

Copy link
Contributor Author

@RocMarshal RocMarshal Oct 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the block (org.apache.flink.runtime.jobmaster.slotpool.PendingRequest) by PhysicalSlot


    boolean fulfill(PhysicalSlot slot) {
        Preconditions.checkState(
                slot.getLoading().equals(getLoading()),
                "Unexpected loading weight, it may be a bug.");
        return slotFuture.complete(slot);
    }

So, what about moving the method declaration into PhysicalSlot ?

this.allocationId = checkNotNull(allocationId);
this.taskManagerLocation = checkNotNull(location);
this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
this.taskManagerGateway = checkNotNull(taskManagerGateway);
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
this.loadingWeight = checkNotNull(loadingWeight);

nit: keep same style.

*
* @return The previous loadable resource profile of the current slot.
*/
Optional<LoadableResourceProfile> getPreviousLoadableResourceProfile();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmp comment: why need this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for extracting trivial lines as a method, which is convenient to call in the related points in the previous implementation

@@ -59,12 +59,7 @@ public void disableBatchSlotRequestTimeoutCheck() {
public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to merge PhysicalSlotProvider and DeclarativeSlotPool into one class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants