This repository has been archived by the owner on Sep 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 47
/
job_type.go
166 lines (151 loc) · 5.72 KB
/
job_type.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright 2015 Alex Browne. All rights reserved.
// Use of this source code is governed by the MIT
// license, which can be found in the LICENSE file.
package jobs
import (
"fmt"
"reflect"
"time"
)
// Types is map of job type names to *Type
var Types = map[string]*Type{}
// Type represents a type of job that can be executed by workers
type Type struct {
name string
handler interface{}
retries uint
dataType reflect.Type
}
// ErrorNameAlreadyRegistered is returned whenever RegisterType is called
// with a name that has already been registered.
type ErrorNameAlreadyRegistered struct {
name string
}
// Error satisfies the error interface.
func (e ErrorNameAlreadyRegistered) Error() string {
return fmt.Sprintf("jobs: Cannot register job type because job type with name %s already exists", e.name)
}
// newErrorNameAlreadyRegistered returns an ErrorNameAlreadyRegistered with the given name.
func newErrorNameAlreadyRegistered(name string) ErrorNameAlreadyRegistered {
return ErrorNameAlreadyRegistered{name: name}
}
// A HandlerFunc is a function which accepts ether zero or one arguments and returns an error.
// The function will be executed by a worker. If the function returns a non-nil error or causes
// a panic, the worker will capture and log the error, and if applicable the job may be queued
// for retry.
type HandlerFunc interface{}
// RegisterType registers a new type of job that can be executed by workers.
// name should be a unique string identifier for the job.
// retries is the number of times this type of job should be retried if it fails.
// handler is a function that a worker will call in order to execute the job.
// handler should be a function which accepts either 0 or 1 arguments of any type,
// corresponding to the data for a job of this type. All jobs of this type must have
// data with the same type as the first argument to handler, or nil if the handler
// accepts no arguments.
func RegisterType(name string, retries uint, handler HandlerFunc) (*Type, error) {
// Make sure name is unique
if _, found := Types[name]; found {
return Types[name], newErrorNameAlreadyRegistered(name)
}
// Make sure handler is a function
handlerType := reflect.TypeOf(handler)
if handlerType.Kind() != reflect.Func {
return nil, fmt.Errorf("jobs: in RegisterNewType, handler must be a function. Got %T", handler)
}
if handlerType.NumIn() > 1 {
return nil, fmt.Errorf("jobs: in RegisterNewType, handler must accept 0 or 1 arguments. Got %d.", handlerType.NumIn())
}
if handlerType.NumOut() != 1 {
return nil, fmt.Errorf("jobs: in RegisterNewType, handler must have exactly one return value. Got %d.", handlerType.NumOut())
}
if !typeIsError(handlerType.Out(0)) {
return nil, fmt.Errorf("jobs: in RegisterNewType, handler must return an error. Got return value of type %s.", handlerType.Out(0).String())
}
Type := &Type{
name: name,
handler: handler,
retries: retries,
}
if handlerType.NumIn() == 1 {
Type.dataType = handlerType.In(0)
}
Types[name] = Type
return Type, nil
}
var errorType = reflect.TypeOf(make([]error, 1)).Elem()
func typeIsError(typ reflect.Type) bool {
return typ.Implements(errorType)
}
// String satisfies the Stringer interface and returns the name of the Type.
func (jt *Type) String() string {
return jt.name
}
// Schedule schedules a on-off job of the given type with the given parameters.
// Jobs with a higher priority will be executed first. The job will not be
// executed until after time. data is the data associated with this particular
// job and should have the same type as the first argument to the handler for this
// Type.
func (jt *Type) Schedule(priority int, time time.Time, data interface{}) (*Job, error) {
// Encode the data
encodedData, err := jt.encodeData(data)
if err != nil {
return nil, err
}
// Create and save the job
job := &Job{
data: encodedData,
typ: jt,
time: time.UTC().UnixNano(),
retries: jt.retries,
priority: priority,
}
// Set the job's status to queued and save it in the database
job.status = StatusQueued
if err := job.save(); err != nil {
return nil, err
}
return job, nil
}
// ScheduleRecurring schedules a recurring job of the given type with the given parameters.
// Jobs with a higher priority will be executed first. The job will not be executed until after
// time. After time, the job will be executed with a frequency specified by freq. data is the
// data associated with this particular job and should have the same type as the first argument
// to the handler for this Type. Every recurring execution of the job will use the
// same data.
func (jt *Type) ScheduleRecurring(priority int, time time.Time, freq time.Duration, data interface{}) (*Job, error) {
// Encode the data
encodedData, err := jt.encodeData(data)
if err != nil {
return nil, err
}
// Create and save the job
job := &Job{
data: encodedData,
typ: jt,
time: time.UTC().UnixNano(),
retries: jt.retries,
freq: freq.Nanoseconds(),
priority: priority,
}
// Set the job's status to queued and save it in the database
job.status = StatusQueued
if err := job.save(); err != nil {
return nil, err
}
return job, nil
}
// encodeData checks that the type of data is what we expect based on the handler for the Type.
// If it is, it encodes the data into a slice of bytes.
func (jt *Type) encodeData(data interface{}) ([]byte, error) {
// Check the type of data
dataType := reflect.TypeOf(data)
if dataType != jt.dataType {
return nil, fmt.Errorf("jobs: provided data was not of the correct type.\nExpected %s for Type %s, but got %s", jt.dataType, jt, dataType)
}
// Encode the data
encodedData, err := encode(data)
if err != nil {
return nil, fmt.Errorf("jobs: error encoding data: %s", err.Error())
}
return encodedData, nil
}