-
Notifications
You must be signed in to change notification settings - Fork 128
/
queue_compute_node.py
69 lines (50 loc) · 1.81 KB
/
queue_compute_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
from asyncio import Queue
import logging
from abc import abstractmethod
from .compute_task import ComputeTask, ComputeTaskResult, ComputeTaskState, ComputeTaskType
from .compute_node import ComputeNode
logger = logging.getLogger(__name__)
class Queue_ComputeNode(ComputeNode):
def __init__(self):
super().__init__()
self.task_queue = Queue()
@abstractmethod
async def execute_task(self, task: ComputeTask) -> {
"content": str,
"message": str,
"state": ComputeTaskState,
"error": {
"code": int,
"message": str,
}
}:
pass
async def push_task(self, task: ComputeTask, proiority: int = 0):
logger.info(f"{self.display()} push task: {task.display()}")
self.task_queue.put_nowait(task)
async def remove_task(self, task_id: str):
pass
async def _run_task(self, task: ComputeTask):
task.state = ComputeTaskState.RUNNING
resp = await self.execute_task(task)
result = ComputeTaskResult()
result.set_from_task(task)
task.state = resp["state"]
if task.state == ComputeTaskState.ERROR:
task.error_str = resp["error"]["message"]
result.worker_id = self.node_id
result.result_str = resp["content"]
result.result_message = resp["message"]
return result
def start(self):
async def _run_task_loop():
while True:
task = await self.task_queue.get()
logger.info(f"{self.display()} get task: {task.display()}")
result = await self._run_task(task)
if result is not None:
task.result = result
asyncio.create_task(_run_task_loop())
def get_task_state(self, task_id: str):
pass