Skip to content

Commit

Permalink
Merge pull request intelligent-machine-learning#192 from intelligent-…
Browse files Browse the repository at this point in the history
…machine-learning/fix-elasticjob-scaler

ElasticJob Scaler generates ScalePlan Crd by a scale plan
  • Loading branch information
workingloong committed Jan 19, 2023
2 parents 2910f00 + 7c6ac59 commit b328f3d
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 90 deletions.
4 changes: 2 additions & 2 deletions dlrover/go/operator/api/v1alpha1/scaleplan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ScaleSpec struct {
// "PS": ReplicaResourceSpec,
// "worker": ReplicaResourceSpec,
// }
ReplicaResourceSpecs map[commonv1.ReplicaType]ReplicaResourceSpec `json:"replicaResourceSpec,omitempty"`
ReplicaResourceSpecs map[commonv1.ReplicaType]ReplicaResourceSpec `json:"replicaResourceSpecs,omitempty"`

// CreatePods are Pods to be created.
CreatePods []PodMeta `json:"createPods,omitempty"`
Expand Down Expand Up @@ -78,7 +78,7 @@ type PodMeta struct {
Type commonv1.ReplicaType `json:"type,omitempty"`

// RankIndex is the index of the Pod
RankIndex int `json:"randIndex,omitempty"`
RankIndex int `json:"rankIndex,omitempty"`

// Service is the service whose endpoint is the Pod.
Service string `json:"service,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,17 @@ spec:
name:
description: Name is the name of the Pod
type: string
randIndex:
rankIndex:
description: RankIndex is the index of the Pod
type: integer
resource:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource defines the resource of each replica
properties:
cpu:
description: CPU is the requested CPU cores of a replica
type: string
gpu:
description: GPU is the requested GPU of a replica
type: string
memory:
description: Memory is the requested memory (MB) of a replica
type: string
type: object
service:
description: Service is the service whose endpoint is the Pod.
Expand All @@ -89,21 +85,17 @@ spec:
name:
description: Name is the name of the Pod
type: string
randIndex:
rankIndex:
description: RankIndex is the index of the Pod
type: integer
resource:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource defines the resource of each replica
properties:
cpu:
description: CPU is the requested CPU cores of a replica
type: string
gpu:
description: GPU is the requested GPU of a replica
type: string
memory:
description: Memory is the requested memory (MB) of a replica
type: string
type: object
service:
description: Service is the service whose endpoint is the Pod.
Expand All @@ -124,9 +116,35 @@ spec:
removePods:
description: RemovePods are Pods to be removed
items:
type: string
description: PodMeta specifies the meta of a Pod.
properties:
id:
description: Id is the identity of the Pod
type: integer
name:
description: Name is the name of the Pod
type: string
rankIndex:
description: RankIndex is the index of the Pod
type: integer
resource:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource defines the resource of each replica
type: object
service:
description: Service is the service whose endpoint is the Pod.
type: string
type:
description: Type is the type of the Pod
type: string
type: object
type: array
replicaResourceSpec:
replicaResourceSpecs:
additionalProperties:
description: ReplicaResourceSpec specifies the number and resources
of replica.
Expand All @@ -135,17 +153,13 @@ spec:
description: Replicas is the number of replica
type: integer
resource:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource defines the resource of each replica
properties:
cpu:
description: CPU is the requested CPU cores of a replica
type: string
gpu:
description: GPU is the requested GPU of a replica
type: string
memory:
description: Memory is the requested memory (MB) of a replica
type: string
type: object
type: object
description: 'A map of ReplicaType (type) to ReplicaSpec (value).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
spec:
ownerJob: elasticjob-sample
manualScaling: True
replicaResourceSpec:
replicaResourceSpecs:
ps:
replicas: 1
resource:
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/master/node/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def remove_training_nodes(self):
node.is_released = True
node.status = NodeStatus.DELETED
logger.info("Remove node %s", node.name)
plan.remove_nodes.append(node.name)
plan.remove_nodes.append(node)
self._scaler.scale(plan)

def start_auto_scale(self):
Expand Down
4 changes: 2 additions & 2 deletions dlrover/python/master/node/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def process_after_ps_cluster_ready(self):
node.is_released = True
if node.id in self._migrated_ps_nodes:
self._migrated_ps_nodes.pop(node.id)
plan.remove_nodes.append(node.name)
plan.remove_nodes.append(node)
return plan

def _get_alive_ps(self) -> List[Node]:
Expand Down Expand Up @@ -273,7 +273,7 @@ def delete_running_ps(self):
)
node.is_released = True
node.status = NodeStatus.DELETED
plan.remove_nodes.append(node.name)
plan.remove_nodes.append(node)
return plan

def migrate_parameter_servers(self, ps_nodes: Dict[str, NodeResource]):
Expand Down
4 changes: 2 additions & 2 deletions dlrover/python/master/node/training_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def remove_node(self, node_id):
logger.error("Unknown deletable worker id: %s" % node_id)
return
worker.is_released = True
plan.remove_nodes.append(worker.name)
plan.remove_nodes.append(worker)
return plan

def relaunch_node(self, node: Node):
Expand All @@ -191,7 +191,7 @@ def relaunch_node(self, node: Node):
service_addr=node.service_addr,
)
)
plan.remove_nodes.append(node.name)
plan.remove_nodes.append(node)
return plan

def cut_pending_node_cpu(self):
Expand Down
14 changes: 8 additions & 6 deletions dlrover/python/master/node/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def delete_exited_workers(self):
and not worker.is_released
):
worker.is_released = True
plan.remove_nodes.append(worker.name)
plan.remove_nodes.append(worker)
return plan

def delete_running_workers(self):
Expand All @@ -207,7 +207,7 @@ def delete_running_workers(self):
worker.name,
)
worker.is_released = True
plan.remove_nodes.append(worker.name)
plan.remove_nodes.append(worker)
return plan

def remove_noncritical_worker(self, worker_id):
Expand All @@ -221,16 +221,18 @@ def migrate_workers(self, workers: Dict[str, NodeResource]):
plan = ScalePlan()
for name, resource in workers.items():
old_node_id = int(name.split("-")[-1])
old_node = self._nodes[old_node_id]
node_id = next(self._node_id_iter)
task_id = self._nodes[old_node_id].rank_index
self._nodes[node_id] = Node(
task_id = old_node.rank_index
new_node = Node(
NodeType.WORKER,
node_id,
config_resource=resource,
status=NodeStatus.INITIAL,
rank_index=task_id,
name=self._new_node_name_fn(NodeType.WORKER, node_id),
)
plan.launch_nodes.append(self._nodes[node_id])
plan.remove_nodes.append(name)
self._nodes[node_id] = new_node
plan.launch_nodes.append(new_node)
plan.remove_nodes.append(old_node)
return plan
2 changes: 1 addition & 1 deletion dlrover/python/master/scaler/base_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ScalePlan(JsonSerializable):
def __init__(self):
self.node_group_resources: Dict[str, NodeGroupResource] = {}
self.launch_nodes: List[Node] = []
self.remove_nodes: List[str] = []
self.remove_nodes: List[Node] = []
self.ps_addrs: List[str] = []

def empty(self):
Expand Down
Loading

0 comments on commit b328f3d

Please sign in to comment.