Skip to content

Commit

Permalink
Support to manually set job resource by a ScalePlan
Browse files Browse the repository at this point in the history
  • Loading branch information
workingloong committed Jan 17, 2023
1 parent 22a3f16 commit 522b936
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 10 deletions.
2 changes: 1 addition & 1 deletion dlrover/go/operator/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: registry.cn-hangzhou.aliyuncs.com/intell-ai/dlrover
newName: easydl/elasticjob-controller
newTag: test
2 changes: 1 addition & 1 deletion dlrover/go/operator/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ spec:
requests:
cpu: 10m
memory: 64Mi
imagePullPolicy: Always
imagePullPolicy: IfNotPresent
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ apiVersion: elastic.iml.github.io/v1alpha1
kind: ScalePlan
metadata:
name: scaleplan-sample
labels:
elasticjob-name: elasticjob-sample
spec:
ownerJob: elasticjob-sample
manualScaling: True
replicaResourceSpec:
ps:
replicas: 1
Expand Down
2 changes: 1 addition & 1 deletion dlrover/go/operator/pkg/controllers/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (m *Manager) newJobMaster(
container := corev1.Container{
Name: "main",
Image: masterImage,
ImagePullPolicy: "Always",
ImagePullPolicy: "IfNotPresent",
Command: []string{"/bin/bash", "-c", command},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
Expand Down
4 changes: 3 additions & 1 deletion dlrover/go/operator/pkg/controllers/scaleplan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ func (r *ScalePlanReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
return ctrl.Result{}, err
}

result, err := r.setScalingOwner(scalePlan, job, defaultPollInterval)
if err != nil {
return result, err
}
if scalePlan.Spec.ManualScaling {
return ctrl.Result{}, err
}
result, err = r.updateJobToScaling(scalePlan, job, defaultPollInterval)
return result, err
}
Expand Down
7 changes: 6 additions & 1 deletion dlrover/python/master/node/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ def start(self):
threading.Thread(
target=self._monitor_nodes, name="node_monitor", daemon=True
).start()
threading.Thread(
target=self._monitor_scale_plan_crd,
name="scaleplan_monitor",
daemon=True,
).start()

def _adjust_worker_for_estimator(self):
if (
Expand Down Expand Up @@ -240,7 +245,7 @@ def _monitor_nodes(self):
logger.warning(e)
time.sleep(30)

def _monitor_scaler_crd(self):
def _monitor_scale_plan_crd(self):
"""Monitor the Scaler CRD from users to adjust the job resource"""
logger.info("Start to monitor Scaler CRD")
while True:
Expand Down
17 changes: 12 additions & 5 deletions dlrover/python/master/watcher/k8s_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ def watch(self):
try:
stream = watch.Watch().stream(
self._k8s_client.api_instance.list_namespaced_custom_object,
self._namespace,
namespace=self._namespace,
group="elastic.iml.github.io",
version="v1alpha1",
plural="scaleplans",
label_selector=self._job_selector,
resource_version=resource_version,
timeout_seconds=60,
Expand All @@ -192,18 +195,22 @@ def watch(self):
)
continue
resource_plan = self._get_resoruce_plan_from_event(scaler_crd)
logger.info("Get a manual resource plan %s", resource_plan)
logger.info(
"Get a manual resource plan %s", resource_plan.toJSON()
)
yield resource_plan
except Exception as e:
raise e

def _get_resoruce_plan_from_event(self, scaler_crd) -> ResourcePlan:
resource_plan = ResourcePlan()
if not scaler_crd["spec"]["manualScaling"]:
if not scaler_crd["spec"].get("manualScaling", False):
logger.info("Skip the Scaler which is not manual")
return resource_plan

for replica, spec in scaler_crd["spec"]["replicaResourceSpec"].items():
for replica, spec in (
scaler_crd["spec"].get("replicaResourceSpec", {}).items()
):
cpu = float(spec.get("resource", {}).get("cpu", "0"))
memory = NodeResource.convert_memory_to_mb(
spec.get("resource", {}).get("memory", "0Mi")
Expand All @@ -212,7 +219,7 @@ def _get_resoruce_plan_from_event(self, scaler_crd) -> ResourcePlan:
spec["replicas"], NodeResource(cpu, memory)
)

for pod in scaler_crd["spec"]["migratePods"]:
for pod in scaler_crd["spec"].get("migratePods", []):
resource_plan.node_resources[pod["name"]] = NodeResource(
float(pod["resource"].get("cpu", "0")),
NodeResource.convert_memory_to_mb(
Expand Down

0 comments on commit 522b936

Please sign in to comment.