Skip to content

Commit

Permalink
[OPS-53] Fix thread deadlocks
Browse files Browse the repository at this point in the history
If exceptions occur in the discovery worker threads,
these exceptions need to be pushed back to the
outgoing queue so the parent thread knows that the
fetch jobs failed; otherwise we end up with all
threads deadlocked on queue.pop calls
  • Loading branch information
jgitlin-p21 committed May 12, 2020
1 parent 717de28 commit 958f707
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
9 changes: 7 additions & 2 deletions lib/artifactory/cleaner/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,20 @@ def discover_artifacts_from_search(artifact_list, threads: 4)
timing[:dequeue] = Benchmark.measure do
until @discovery_queues.incoming.empty? and @discovery_queues.outgoing.empty? and not @workers.any? &:working?
begin
#debuglog("[DEBUG] Pop from outgoing queue; incoming.len=#{@discovery_queues.incoming.length}, outgoing.len=#{@discovery_queues.outgoing.length}")
item = @discovery_queues.outgoing.pop
if item.kind_of? Artifactory::Resource::Artifact
result << item
#debuglog "[DEBUG] Discovered #{item} from a child thread"
elsif item.kind_of? Artifactory::Error::ArtifactoryError
STDERR.puts "[ERROR] Artifactory Error from artifact fetch: #{item}"
STDERR.puts item.full_message
STDERR.puts "Caused by #{item.cause.full_message}" if item.cause
elsif item.kind_of? Error
STDERR.puts "[ERROR] Error from artifact fetch: #{item}"
STDERR.puts item.full_message
STDERR.puts "Caused by #{item.cause.full_message}" if item.cause
elsif !artifact.nil?
elsif !item.nil?
STDERR.puts "[ERROR] Got #{item} back from the discovery queue, expected an Artifactory::Resource::Artifact"
end
rescue => processing_ex
Expand Down Expand Up @@ -450,7 +455,7 @@ def spawn_threads
# given artifact data, add it to the queue for processing and make sure we have workers to process it
def queue_discovery_of_artifact(artifact_data)
@discovery_queues.incoming.push(artifact_data)
#debuglog "[DEBUG] Queued #{artifact_data['uri']} for discovery"
debuglog "[DEBUG] Queued #{artifact_data['uri']} for discovery"
spawn_threads
end

Expand Down
9 changes: 6 additions & 3 deletions lib/artifactory/cleaner/discovery_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def log(msg)
def process(payload)
@working = true
begin
@queues.outgoing.push discover_artifact_from payload
@queues.outgoing.push(discover_artifact_from(payload))
rescue => ex
return ex # TODO: Should this be pushed to the outgoing queue instead?
STDERR.puts "[Error] Exception in thread: #{ex.full_message}"
@queues.outgoing.push(ex)
ensure
@working = false
end
Expand All @@ -98,9 +99,11 @@ def discover_artifact_from(artifact_info, retries = 10)
artifact = nil
while retries > 0
begin
#STDERR.puts "[DEBUG] thread discover_artifact_from #{artifact_info["uri"]} start"
retries -= 1
artifact = Artifactory::Cleaner::DiscoveredArtifact.from_url(artifact_info["uri"], client: @artifactory_client)
artifact.last_downloaded = Time.parse(artifact_info["lastDownloaded"]) unless artifact_info["lastDownloaded"].to_s.empty?
#STDERR.puts "[DEBUG] thread discover_artifact_from #{artifact_info["uri"]} end"
return artifact
rescue Net::OpenTimeout, Artifactory::Error::ConnectionError => err
artifact = err
Expand All @@ -111,7 +114,7 @@ def discover_artifact_from(artifact_info, retries = 10)
rescue Artifactory::Error::HTTPError => err
artifact = err
if err.code == 404
log "[WARN] HTTP 404 Not Found fetching: #{artifact["uri"]}"
log "[WARN] HTTP 404 Not Found fetching: #{artifact_info["uri"]}"
return nil
else
retries = min(retries, 1)
Expand Down

0 comments on commit 958f707

Please sign in to comment.