Skip to content

Commit

Permalink
Allow users to set queue priority via queue_attributes. Updated worke…
Browse files Browse the repository at this point in the history
…r to pull from worker prior to setting job priority to default priority.
  • Loading branch information
ryannealmes committed Nov 15, 2015
1 parent 3b31ffd commit 3f198bc
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 28 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ Delayed::Job.enqueue job, :queue => 'tracking'
handle_asynchronously :tweet_later, :queue => 'tweets'
```

You can configure default priorities for named queues:

```ruby
Delayed::Worker.queue_attributes = [
{ name: :high_priority, priority: -10 },
{ name: :low_priority, priority: 10 }
]
```

Running Jobs
============
`script/delayed_job` can be used to manage a background process which will
Expand Down
2 changes: 1 addition & 1 deletion delayed_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Gem::Specification.new do |spec|
spec.description = 'Delayed_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.'
spec.email = ['[email protected]']
spec.files = %w[CHANGELOG.md CONTRIBUTING.md LICENSE.md README.md Rakefile delayed_job.gemspec]
spec.files += Dir.glob('{contrib,lib,recipes,spec}/**/*')
spec.files += Dir.glob('{contrib,lib,recipes,spec}/**/*') # rubocop:disable SpaceAroundOperators
spec.homepage = 'https://github.com/collectiveidea/delayed_job'
spec.licenses = ['MIT']
spec.name = 'delayed_job'
Expand Down
28 changes: 6 additions & 22 deletions lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,12 @@ def self.included(base)

module ClassMethods
# Add a job to the queue
def enqueue(*args) # rubocop:disable CyclomaticComplexity
options = args.extract_options!
options[:payload_object] ||= args.shift
options[:priority] ||= Delayed::Worker.default_priority

if options[:queue].nil?
if options[:payload_object].respond_to?(:queue_name)
options[:queue] = options[:payload_object].queue_name
end
options[:queue] ||= Delayed::Worker.default_queue_name
end

if args.size > 0
warn '[DEPRECATION] Passing multiple arguments to `#enqueue` is deprecated. Pass a hash with :priority and :run_at.'
options[:priority] = args.first || options[:priority]
options[:run_at] = args[1]
end

unless options[:payload_object].respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
def enqueue(*args)
job_options = Delayed::Backend::JobPreparer.new(*args).prepare
enqueue_job(job_options)
end

def enqueue_job(options)
new(options).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
Expand Down Expand Up @@ -74,7 +58,7 @@ def failed?
end
alias_method :failed, :failed?

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ # rubocop:disable ConstantName
ParseObjectFromYaml = %r{\!ruby/\w+\:([^\s]+)} # rubocop:disable ConstantName

def name
@name ||= payload_object.respond_to?(:display_name) ? payload_object.display_name : payload_object.class.name
Expand Down
51 changes: 51 additions & 0 deletions lib/delayed/backend/job_preparer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module Delayed
module Backend
class JobPreparer
attr_reader :options, :args

def initialize(*args)
@options = args.extract_options!
@args = args
end

def prepare
set_payload
set_queue_name
set_priority
handle_deprecation
options
end

private

def set_payload
options[:payload_object] ||= args.shift
end

def set_queue_name
if options[:queue].nil? && options[:payload_object].respond_to?(:queue_name)
options[:queue] = options[:payload_object].queue_name
else
options[:queue] ||= Delayed::Worker.default_queue_name
end
end

def set_priority
options[:priority] ||= Delayed::Worker.default_priority
queue_attributes = Delayed::Worker.queue_attributes.select { |queue| queue[:name].to_s == options[:queue] }
options[:priority] = queue_attributes.first[:priority] if queue_attributes.any?
end

def handle_deprecation
if args.size > 0
warn '[DEPRECATION] Passing multiple arguments to `#enqueue` is deprecated. Pass a hash with :priority and :run_at.'
options[:priority] = args.first || options[:priority]
options[:run_at] = args[1]
end

return if options[:payload_object].respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
end
end
end
6 changes: 6 additions & 0 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ def create_job(opts = {})
end
expect(described_class.reserve(worker)).to be_nil
end

it 'sets job priority based on queue_attributes configuration' do
Delayed::Worker.queue_attributes = [{:name => 'job_tracking', :priority => 4}]
job = described_class.enqueue :payload_object => NamedQueueJob.new
expect(job.priority).to eq(4)
end
end

context 'clear_locks!' do
Expand Down
8 changes: 4 additions & 4 deletions lib/delayed/psych_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def visit_Psych_Nodes_Mapping(object) # rubocop:disable CyclomaticComplexity, Me
return revive(Psych.load_tags[object.tag], object) if Psych.load_tags[object.tag]

case object.tag
when /^!ruby\/object/
when %r{^!ruby/object}
result = super
if defined?(ActiveRecord::Base) && result.is_a?(ActiveRecord::Base)
klass = result.class
Expand All @@ -44,7 +44,7 @@ def visit_Psych_Nodes_Mapping(object) # rubocop:disable CyclomaticComplexity, Me
else
result
end
when /^!ruby\/ActiveRecord:(.+)$/
when %r{^!ruby/ActiveRecord:(.+)$}
klass = resolve_class(Regexp.last_match[1])
payload = Hash[*object.children.map { |c| accept c }]
id = payload['attributes'][klass.primary_key]
Expand All @@ -54,7 +54,7 @@ def visit_Psych_Nodes_Mapping(object) # rubocop:disable CyclomaticComplexity, Me
rescue ActiveRecord::RecordNotFound => error
raise Delayed::DeserializationError, "ActiveRecord::RecordNotFound, class: #{klass}, primary key: #{id} (#{error.message})"
end
when /^!ruby\/Mongoid:(.+)$/
when %r{^!ruby/Mongoid:(.+)$}
klass = resolve_class(Regexp.last_match[1])
payload = Hash[*object.children.map { |c| accept c }]
id = payload['attributes']['_id']
Expand All @@ -63,7 +63,7 @@ def visit_Psych_Nodes_Mapping(object) # rubocop:disable CyclomaticComplexity, Me
rescue Mongoid::Errors::DocumentNotFound => error
raise Delayed::DeserializationError, "Mongoid::Errors::DocumentNotFound, class: #{klass}, primary key: #{id} (#{error.message})"
end
when /^!ruby\/DataMapper:(.+)$/
when %r{^!ruby/DataMapper:(.+)$}
klass = resolve_class(Regexp.last_match[1])
payload = Hash[*object.children.map { |c| accept c }]
begin
Expand Down
4 changes: 3 additions & 1 deletion lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ class Worker # rubocop:disable ClassLength
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = []
DEFAULT_QUEUE_ATTRIBUTES = []
DEFAULT_READ_AHEAD = 5

cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
:read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete,
:default_log_level
:default_log_level, :queue_attributes

# Named queue into which jobs are enqueued by default
cattr_accessor :default_queue_name
Expand All @@ -38,6 +39,7 @@ def self.reset
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
end
Expand Down
1 change: 1 addition & 0 deletions lib/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require 'delayed/plugin'
require 'delayed/plugins/clear_locks'
require 'delayed/backend/base'
require 'delayed/backend/job_preparer'
require 'delayed/worker'
require 'delayed/deserialization_error'
require 'delayed/railtie' if defined?(Rails::Railtie)
Expand Down

0 comments on commit 3f198bc

Please sign in to comment.