Skip to content

Commit

Permalink
[FLINK-4347][cluster management] Implement SlotManager core
Browse files Browse the repository at this point in the history
This closes apache#2388
  • Loading branch information
KurtYoung authored and StephanEwen committed Dec 23, 2016
1 parent 1b70a25 commit 39035ba
Show file tree
Hide file tree
Showing 8 changed files with 1,377 additions and 8 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ public final int hashCode() {

@Override
public String toString() {
return "ResourceID{" +
"resourceId='" + resourceId + '\'' +
'}';
return resourceId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.clusterframework.types;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
* identification and resource profile which we can compare to the resource request.
*/
public class ResourceSlot implements ResourceIDRetrievable, Serializable {

private static final long serialVersionUID = -5853720153136840674L;

/** The unique identification of this slot */
private final SlotID slotId;

/** The resource profile of this slot */
private final ResourceProfile resourceProfile;

public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
this.slotId = checkNotNull(slotId);
this.resourceProfile = checkNotNull(resourceProfile);
}

@Override
public ResourceID getResourceID() {
return slotId.getResourceID();
}

public SlotID getSlotId() {
return slotId;
}

public ResourceProfile getResourceProfile() {
return resourceProfile;
}

/**
* Check whether required resource profile can be matched by this slot.
*
* @param required The required resource profile
* @return true if requirement can be matched
*/
public boolean isMatchingRequirement(ResourceProfile required) {
return resourceProfile.isMatching(required);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ public int hashCode() {

@Override
public String toString() {
return "SlotID{" +
"resourceId=" + resourceId +
", slotId=" + slotId +
'}';
return resourceId + "_" + slotId;
}

/**
* Generate a random slot id.
*
* @return A random slot id.
*/
public static SlotID generate() {
return new SlotID(ResourceID.generate(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,57 @@

package org.apache.flink.runtime.rpc.resourcemanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;

import java.io.Serializable;

public class SlotRequest implements Serializable{
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
*/
public class SlotRequest implements Serializable {

private static final long serialVersionUID = -6586877187990445986L;

/** The JobID of the slot requested for */
private final JobID jobId;

/** The unique identification of this request */
private final AllocationID allocationId;

/** The resource profile of the required slot */
private final ResourceProfile resourceProfile;

public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
this.jobId = checkNotNull(jobId);
this.allocationId = checkNotNull(allocationId);
this.resourceProfile = checkNotNull(resourceProfile);
}

/**
* Get the JobID of the slot requested for.
* @return The job id
*/
public JobID getJobId() {
return jobId;
}

/**
* Get the unique identification of this request
* @return the allocation id
*/
public AllocationID getAllocationId() {
return allocationId;
}

/**
* Get the resource profile of the desired slot
* @return The resource profile
*/
public ResourceProfile getResourceProfile() {
return resourceProfile;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc.taskexecutor;

import org.apache.flink.runtime.clusterframework.types.ResourceID;

import java.io.Serializable;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A report about the current status of all slots of the TaskExecutor, describing
* which slots are available and allocated, and what jobs (JobManagers) the allocated slots
* have been allocated to.
*/
public class SlotReport implements Serializable {

private static final long serialVersionUID = -3150175198722481689L;

/** The slots status of the TaskManager */
private final List<SlotStatus> slotsStatus;

/** The resource id which identifies the TaskManager */
private final ResourceID resourceID;

public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
this.slotsStatus = checkNotNull(slotsStatus);
this.resourceID = checkNotNull(resourceID);
}

public List<SlotStatus> getSlotsStatus() {
return slotsStatus;
}

public ResourceID getResourceID() {
return resourceID;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc.taskexecutor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This describes the slot current status which located in TaskManager.
*/
public class SlotStatus implements Serializable {

private static final long serialVersionUID = 5099191707339664493L;

/** slotID to identify a slot */
private final SlotID slotID;

/** the resource profile of the slot */
private final ResourceProfile profiler;

/** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
private final AllocationID allocationID;

/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
private final JobID jobID;

public SlotStatus(SlotID slotID, ResourceProfile profiler) {
this(slotID, profiler, null, null);
}

public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
this.slotID = checkNotNull(slotID, "slotID cannot be null");
this.profiler = checkNotNull(profiler, "profile cannot be null");
this.allocationID = allocationID;
this.jobID = jobID;
}

/**
* Get the unique identification of this slot
*
* @return The slot id
*/
public SlotID getSlotID() {
return slotID;
}

/**
* Get the resource profile of this slot
*
* @return The resource profile
*/
public ResourceProfile getProfiler() {
return profiler;
}

/**
* Get the allocation id of this slot
*
* @return The allocation id if this slot is allocated, otherwise null
*/
public AllocationID getAllocationID() {
return allocationID;
}

/**
* Get the job id of the slot allocated for
*
* @return The job id if this slot is allocated, otherwise null
*/
public JobID getJobID() {
return jobID;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SlotStatus that = (SlotStatus) o;

if (!slotID.equals(that.slotID)) {
return false;
}
if (!profiler.equals(that.profiler)) {
return false;
}
if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
return false;
}
return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;

}

@Override
public int hashCode() {
int result = slotID.hashCode();
result = 31 * result + profiler.hashCode();
result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
return result;
}

}
Loading

0 comments on commit 39035ba

Please sign in to comment.