forked from arenadata/adcm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_runner.py
executable file
·106 lines (84 loc) · 2.99 KB
/
task_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import subprocess
import adcm.init_django # pylint: disable=unused-import
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
from cm.models import TaskLog, JobLog
from cm.logger import log
import cm.config as config
import cm.job
def open_file(root, tag, task_id):
fname = "{}/{}-{}.txt".format(root, task_id, tag)
f = open(fname, 'w')
return f
def run_job(task_id, job_id, out_file, err_file):
log.debug("run job #%s of task #%s", job_id, task_id,)
try:
proc = subprocess.Popen([
'{}/job_runner.py'.format(config.BASE_DIR),
str(job_id)
], stdout=out_file, stderr=err_file)
except: # pylint: disable=bare-except
log.error("exception runnung job %s", job_id)
res = proc.wait()
return res
def run_task(task_id, args=None):
log.debug("task_runner.py called as: %s", sys.argv)
try:
task = TaskLog.objects.get(id=task_id)
except ObjectDoesNotExist:
log.error("no task %s", task_id)
return
jobs = JobLog.objects.filter(task_id=task.id).order_by('id')
if not jobs:
log.error("no jobs for task %s", task.id)
cm.job.finish_task(task, None, config.Job.FAILED)
return
out_file = open_file(config.LOG_DIR, 'task-out', task_id)
err_file = open_file(config.LOG_DIR, 'task-err', task_id)
log.info("run task #%s", task_id)
cm.job.set_task_status(task, config.Job.RUNNING)
job = None
count = 0
for job in jobs:
if args == 'restart' and job.status == config.Job.SUCCESS:
log.info('skip job #%s status "%s" of task #%s', job.id, job.status, task_id)
continue
if count:
cm.job.re_prepare_job(task, job)
job.start_date = timezone.now()
job.save()
res = run_job(task.id, job.id, out_file, err_file)
count += 1
if res != 0:
break
if res == 0:
cm.job.finish_task(task, job, config.Job.SUCCESS)
else:
cm.job.finish_task(task, job, config.Job.FAILED)
out_file.close()
err_file.close()
log.info("finish task #%s, ret %s", task_id, res)
def do():
if len(sys.argv) < 2:
print("\nUsage:\n{} task_id [restart]\n".format(os.path.basename(sys.argv[0])))
sys.exit(4)
elif len(sys.argv) > 2:
run_task(sys.argv[1], sys.argv[2])
else:
run_task(sys.argv[1])
if __name__ == '__main__':
do()