forked from rails/mission_control-jobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
resque_ext.rb
304 lines (245 loc) · 8.16 KB
/
resque_ext.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
module ActiveJob::QueueAdapters::ResqueExt
include MissionControl::Jobs::Adapter
def initialize(redis = Resque.redis)
super()
@redis = redis
end
def activating(&block)
Resque.with_per_thread_redis_override(redis, &block)
end
def queues
queues = queue_names
active_statuses = []
counts = []
redis.multi do |multi|
queues.each do |queue_name|
active_statuses << multi.mget("pause:queue:#{queue_name}", "pause:all")
counts << multi.llen("queue:#{queue_name}")
end
end
queues.collect.with_index do |queue_name, index|
{ name: queue_name, active: active_statuses[index].value.compact.empty?, size: counts[index].value }
end
end
def queue_size(queue_name)
Resque.size queue_name
end
def clear_queue(queue_name)
Resque.remove_queue(queue_name)
end
def pause_queue(queue_name)
ResquePauseHelper.pause(queue_name)
end
def resume_queue(queue_name)
ResquePauseHelper.unpause(queue_name)
end
def queue_paused?(queue_name)
ResquePauseHelper.paused?(queue_name)
end
def supported_job_filters(jobs_relation)
if jobs_relation.pending? then [ :queue_name ]
else []
end
end
def jobs_count(jobs_relation)
resque_jobs_for(jobs_relation).count
end
def fetch_jobs(jobs_relation)
resque_jobs_for(jobs_relation).all
end
def retry_all_jobs(jobs_relation)
resque_jobs_for(jobs_relation).retry_all
end
def retry_job(job, jobs_relation)
resque_jobs_for(jobs_relation).retry_job(job)
end
def discard_all_jobs(jobs_relation)
resque_jobs_for(jobs_relation).discard_all
end
def discard_job(job, jobs_relation)
resque_jobs_for(jobs_relation).discard(job)
end
def find_job(job_id, jobs_relation)
resque_jobs_for(jobs_relation).find_job(job_id)
end
def supports_queue_pausing?
defined?(ResquePauseHelper)
end
private
attr_reader :redis
def queue_names
Resque.queues
end
def resque_jobs_for(jobs_relation)
ResqueJobs.new(jobs_relation, redis: redis)
end
class ResqueJobs
attr_reader :jobs_relation
delegate :default_page_size, :paginated?, :limit_value_provided?, to: :jobs_relation
def initialize(jobs_relation, redis:)
@jobs_relation = jobs_relation
@redis = redis
end
def count
if paginated?
count_fetched_jobs # no direct way of counting jobs
else
direct_jobs_count
end
end
def all
@all ||= fetch_resque_jobs.collect.with_index { |resque_job, index| deserialize_resque_job(resque_job, index) if resque_job.is_a?(Hash) }.compact
end
def retry_all
if use_batches?
retry_all_in_batches
else
retry_jobs(jobs_relation.to_a.reverse)
end
end
def retry_job(job)
# Not named just +retry+ because it collides with reserved Ruby keyword.
resque_requeue_and_discard(job)
end
def discard_all
if jobs_relation.failed? && targeting_all_jobs?
clear_failed_queue
else
discard_all_one_by_one
end
end
def discard(job)
redis.multi do |multi|
multi.lset(queue_redis_key, job.position, SENTINEL)
multi.lrem(queue_redis_key, 1, SENTINEL)
end
rescue Redis::CommandError => error
handle_resque_job_error(job, error)
end
def find_job(job_id)
jobs_by_id[job_id]
end
private
attr_reader :redis
SENTINEL = "" # See +Resque::Datastore#remove_from_failed_queue+
# Redis transactions severely speed up operations, specially when the network latency is high.
# We limit the transaction size because large batches can result in redis timeout errors.
MAX_REDIS_TRANSACTION_SIZE = 100
def targeting_all_jobs?
!paginated? && !jobs_relation.filtering_needed?
end
def fetch_resque_jobs
if jobs_relation.failed? || jobs_relation.queue_name.blank?
fetch_failed_resque_jobs
else
fetch_queue_resque_jobs
end
end
def fetch_failed_resque_jobs
Array.wrap(Resque::Failure.all(jobs_relation.offset_value, jobs_relation.limit_value))
end
def fetch_queue_resque_jobs
unless jobs_relation.queue_name.present?
raise ActiveJob::Errors::QueryError, "This adapter requires a queue name unless fetching failed jobs"
end
Array.wrap(Resque.peek(jobs_relation.queue_name, jobs_relation.offset_value, jobs_relation.limit_value))
end
def deserialize_resque_job(resque_job_hash, index)
args_hash = resque_job_hash.dig("payload", "args") || resque_job_hash.dig("args")
ActiveJob::JobProxy.new(args_hash&.first).tap do |job|
job.last_execution_error = execution_error_from_resque_job(resque_job_hash)
job.raw_data = resque_job_hash
job.position = jobs_relation.offset_value + index
job.failed_at = resque_job_hash["failed_at"]&.to_datetime
job.status = job.failed_at.present? ? :failed : :pending
end
end
def execution_error_from_resque_job(resque_job_hash)
if resque_job_hash["exception"].present?
ActiveJob::ExecutionError.new \
error_class: resque_job_hash["exception"],
message: resque_job_hash["error"],
backtrace: resque_job_hash["backtrace"]
end
end
def direct_jobs_count
jobs_relation.failed? ? failed_jobs_count : pending_jobs_count
end
def pending_jobs_count
Resque.queue_sizes.inject(0) do |sum, (queue_name, queue_size)|
if jobs_relation.queue_name.blank? || jobs_relation.queue_name == queue_name
sum + queue_size
else
sum
end
end
end
def failed_jobs_count
Resque.data_store.num_failed
end
def count_fetched_jobs
all.size
end
def queue_redis_key
jobs_relation.failed? ? "failed" : "queue:#{jobs_relation.queue_name}"
end
def clear_failed_queue
Resque::Failure.clear("failed")
end
def retry_jobs(jobs)
in_transactional_jobs_batches(jobs) do |jobs_batch|
jobs_batch.each { |job| retry_job(job) }
end
end
def in_transactional_jobs_batches(jobs)
jobs.each_slice(MAX_REDIS_TRANSACTION_SIZE) do |jobs_batch|
redis.multi do |multi|
yield jobs_batch
end
end
end
def use_batches?
!jobs_relation.limit_value_provided? && jobs_relation.count > default_page_size
end
def retry_all_in_batches
jobs_relation.in_batches(order: :desc, &:retry_all)
end
def resque_requeue_and_discard(job)
requeue(job)
discard(job)
end
def requeue(job)
resque_job = job.raw_data
resque_job["retried_at"] = Time.now.strftime("%Y/%m/%d %H:%M:%S")
redis.lset(queue_redis_key, job.position, Resque.encode(resque_job))
Resque::Job.create(resque_job["queue"], resque_job["payload"]["class"], *resque_job["payload"]["args"])
rescue Redis::CommandError => error
handle_resque_job_error(job, error)
end
def discard_all_one_by_one
if use_batches?
discard_all_in_batches
else
discard_jobs(jobs_relation.to_a.reverse)
end
end
def discard_jobs(jobs)
in_transactional_jobs_batches(jobs) do |jobs_batch|
jobs_batch.each { |job| discard(job) }
end
end
def discard_all_in_batches
jobs_relation.in_batches(order: :desc, &:discard_all)
end
def jobs_by_id
@jobs_by_id ||= all.index_by(&:job_id)
end
def handle_resque_job_error(job, error)
if error.message =~/no such key/i
raise ActiveJob::Errors::JobNotFoundError.new(job, jobs_relation)
else
raise error
end
end
end
end