Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Refactored k8s operator to use kopf for controller logic #15787

Merged
merged 44 commits into from
Jun 1, 2021
Merged
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8fb61df
Refactored k8s operator to use kopf for controller logic
tgaddair May 13, 2021
1956df6
Added role permissions
tgaddair May 13, 2021
6a2e28d
Fixed TODO
tgaddair May 13, 2021
b3ba4d9
Go back to global queue
tgaddair May 13, 2021
18108ff
Removed unused memo
tgaddair May 13, 2021
cbc24f4
Fix target
tgaddair May 13, 2021
82e117a
Merge branch 'master' into kopf
DmitriGekhtman May 20, 2021
e02046d
logistics
DmitriGekhtman May 20, 2021
31f9f2c
lint
DmitriGekhtman May 20, 2021
9cda7b3
Addressed comments
tgaddair May 21, 2021
3e78c5e
Typing
tgaddair May 21, 2021
d868fb7
Changed to use memo
tgaddair May 21, 2021
2cac877
queue to mp
tgaddair May 21, 2021
5997dea
lint
DmitriGekhtman May 21, 2021
a80e8be
Merge branch 'master' into kopf
DmitriGekhtman May 21, 2021
8c785c9
Merged master
tgaddair May 25, 2021
d1f8aec
Merge branch 'kopf' of https://github.com/tgaddair/ray into kopf
DmitriGekhtman May 25, 2021
fcf6a6b
update test requirments
DmitriGekhtman May 25, 2021
9011889
Skip operator type check
DmitriGekhtman May 25, 2021
fb1e4df
Type fiddling, imports
DmitriGekhtman May 25, 2021
ccfdc3a
No kopf in Ray 1.6
DmitriGekhtman May 26, 2021
93eca5f
Mock kopf in unit test
DmitriGekhtman May 26, 2021
b338026
format
DmitriGekhtman May 26, 2021
5a5e1cc
Remove kopf from requirements.txt
DmitriGekhtman May 26, 2021
5ac79c0
try dockerfile again
DmitriGekhtman May 26, 2021
2a4d53c
Fix requirements.txt
DmitriGekhtman May 26, 2021
d9a29f3
don't prune kopf status info
DmitriGekhtman May 27, 2021
94f0caf
wip
DmitriGekhtman May 27, 2021
c5723c1
wip
DmitriGekhtman May 27, 2021
39a83d9
Simplify code, fix status handling.
DmitriGekhtman May 27, 2021
53f5c1b
Space out retries in tests
DmitriGekhtman May 28, 2021
e256a43
CRD before operator in scale test
DmitriGekhtman May 28, 2021
f0fda95
newline
DmitriGekhtman May 28, 2021
1e0a33c
Update delete instructions. Kopf finalizer warning.
DmitriGekhtman May 28, 2021
a759b01
wip
DmitriGekhtman May 28, 2021
2c2e5fd
Test and doc update
DmitriGekhtman May 28, 2021
ae4310d
Merge branch 'master' into kopf
DmitriGekhtman May 28, 2021
936a269
Better subprocess clean-up
DmitriGekhtman May 29, 2021
2aace4c
Operator in main thread
DmitriGekhtman May 29, 2021
4ef291c
Teardown
DmitriGekhtman May 29, 2021
fb457b0
Wait for service deletion in test
DmitriGekhtman May 29, 2021
709b12c
Fix test, wait for teardown
DmitriGekhtman May 29, 2021
b92054e
Remove main
DmitriGekhtman May 29, 2021
19c7910
Sneak in doc fix + helm chart config consistency.
DmitriGekhtman May 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
DmitriGekhtman committed May 21, 2021
commit 5997dea0aa973ea460b3685fbae941b6a82767f4
9 changes: 4 additions & 5 deletions python/ray/ray_operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
logger = logging.getLogger(__name__)

# Queue to process cluster status updates.
cluster_status_q = mp.Queue(
) # type: mp.Queue[Optional[Tuple[str, str, str]]]
cluster_status_q = mp.Queue() # type: mp.Queue[Optional[Tuple[str, str, str]]]


class RayCluster:
Expand Down Expand Up @@ -191,7 +190,7 @@ def stop_background_worker(memo: kopf.Memo, **_):
memo.status_handler.join()


def status_handling_loop(queue: queue.Queue):
def status_handling_loop(queue: mp.Queue):
while True:
item = queue.get()
if item is None:
Expand Down Expand Up @@ -250,7 +249,7 @@ def update_fn(body, old, new, name, namespace, memo: kopf.Memo, **kwargs):
# Update if there's been a change to the spec or if we're attempting
# recovery from autoscaler failure.
if spec_changed or ray_restart_required:
ray_cluster = memo.get('ray_cluster')
ray_cluster = memo.get("ray_cluster")
if ray_cluster is None:
ray_cluster = RayCluster(cluster_config)
memo.ray_cluster = ray_cluster
Expand All @@ -266,7 +265,7 @@ def update_fn(body, old, new, name, namespace, memo: kopf.Memo, **kwargs):

@kopf.on.delete("rayclusters")
def delete_fn(memo: kopf.Memo, **kwargs):
ray_cluster = memo.get('ray_cluster')
ray_cluster = memo.get("ray_cluster")
if ray_cluster is None:
return

Expand Down