-
Notifications
You must be signed in to change notification settings - Fork 128
/
compute_kernel.py
124 lines (96 loc) · 4.15 KB
/
compute_kernel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from abc import ABC, abstractmethod
from typing import Optional
import logging
import asyncio
from asyncio import Queue
from .agent import AgentPrompt
from .compute_node import ComputeNode
from .compute_task import ComputeTask, ComputeTaskState, ComputeTaskResult
logger = logging.getLogger(__name__)
# How to dispatch different computing tasks (some tasks may contain a large amount of state for correct execution)
# to suitable computing nodes, achieving a balance of speed, cost, and power consumption,
# is the CORE GOAL of the entire computing task schedule system (aios_kernel).
class ComputeKernel:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.is_start = False
return cls._instance
def __init__(self) -> None:
if self.is_start is True:
return
self.is_start = True
self.task_queue = Queue()
self.is_start = False
self.compute_nodes = {}
self.start()
def run(self, task: ComputeTask) -> None:
# check there is compute node can support this task
if self.is_task_support(task) is False:
logger.error(
f"task {task.display()} is not support by any compute node")
return
# add task to working_queue
self.task_queue.put_nowait(task)
def start(self):
if self.is_start is True:
logger.warn("compute_kernel is already start")
return
self.is_start = True
async def _run_task_loop():
while True:
task = await self.task_queue.get()
logger.info(f"compute_kernel get task: {task.display()}")
c_node: ComputeNode = self._schedule(task)
await c_node.push_task(task)
logger.warn("compute_kernel is stoped!")
asyncio.create_task(_run_task_loop())
def _schedule(self, task) -> ComputeNode:
for node in self.compute_nodes.values():
if node.is_support(task) is True:
return node
logger.warning(
f"task {task.display()} is not support by any compute node")
return None
def add_compute_node(self, node: ComputeNode):
if self.compute_nodes.get(node.node_id) is not None:
logger.warn(
f"compute_node {node.display()} already in compute_kernel")
return
self.compute_nodes[node.node_id] = node
logger.info(f"add compute_node {node.display()} to compute_kernel")
def disable_compute_node(self, node_id: str):
node = self.compute_nodes.get(node_id)
if node is None:
logger.warn(f"compute_node {node_id} not in compute_kernel")
return
node.enable = False
def is_task_support(self, task: ComputeTask) -> bool:
return True
# friendly interface for use:
def llm_completion(self, prompt: AgentPrompt, mode_name: Optional[str] = None, max_token: int = 0,inner_functions = None):
# craete a llm_work_task ,push on queue's end
# then task_schedule would run this task.(might schedule some work_task to another host)
task_req = ComputeTask()
task_req.set_llm_params(prompt, mode_name, max_token,inner_functions)
self.run(task_req)
return task_req
async def do_llm_completion(self, prompt: AgentPrompt, mode_name: Optional[str] = None, max_token: int = 0, inner_functions = None) -> str:
task_req = self.llm_completion(prompt, mode_name, max_token,inner_functions)
async def check_timer():
check_times = 0
while True:
if task_req.state == ComputeTaskState.DONE:
break
if task_req.state == ComputeTaskState.ERROR:
break
if check_times >= 20:
task_req.state = ComputeTaskState.ERROR
break
await asyncio.sleep(0.5)
check_times += 1
await asyncio.create_task(check_timer())
if task_req.state == ComputeTaskState.DONE:
return task_req.result
return "error!"