Skip to content

Commit

Permalink
reduce code complexity in extract.py
Browse files Browse the repository at this point in the history
These changes mostly cover populating PDPAmpliconCollection objects
  • Loading branch information
widdowquinn committed Jun 6, 2019
1 parent 8138b14 commit f73226e
Showing 1 changed file with 98 additions and 45 deletions.
143 changes: 98 additions & 45 deletions diagnostic_primers/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from Bio import SeqIO
from Bio.Phylo.TreeConstruction import DistanceCalculator

from diagnostic_primers import load_primers
from diagnostic_primers.primersearch import parse_output

# Convenience struct for combination of primer object, primersearch result, amplimer
Expand Down Expand Up @@ -206,6 +205,20 @@ def primer_amplicons(self):
return self._primer_indexed


# Convenience struct for handling PDPCollection data reindexed by primer name, when
# extracting amplicons
IndexedPrimerData = namedtuple("IndexedPrimerData", "namedict genomepaths")

# Convenience struct for handling caches when extracting amplicons
# seq_cache of Bio.Seq.Seq objects: target genomes that may be amplified by the primer
# psoutput_cache of PrimerSearch outputs for the primer
EACaches = namedtuple("EACaches", "seq_cache psoutput_cache")

# Convenience struct for handling combinations of amplicon collection, single primer,
# and min/max size limits
EAData = namedtuple("EAData", "amplicons primer limits")


def extract_amplicons(name, primer, pdpcoll, limits, seq_cache=None):
"""Return PDPAmpliconCollection corresponding to primers in the passed file
Expand All @@ -218,8 +231,9 @@ def extract_amplicons(name, primer, pdpcoll, limits, seq_cache=None):
this is cached locally to save on file IO
"""
# Make dictionaries of each config entry and source genome path by name
namedict = {_.name: _ for _ in pdpcoll.data}
genomepaths = {_.name: _.seqfile for _ in pdpcoll.data}
indexed_data = IndexedPrimerData(
{_.name: _ for _ in pdpcoll.data}, {_.name: _.seqfile for _ in pdpcoll.data}
)

# For each primer, we identify the corresponding config file entry
# We're going to extract each primer amplicon individually. We know
Expand All @@ -233,23 +247,41 @@ def extract_amplicons(name, primer, pdpcoll, limits, seq_cache=None):
# for reuse and reduced fileIO.
#
# The complete primer list for each source genome is also cached.
psoutput_cache = {} # primersearch output
if seq_cache is None: # genomes (may be reused)
seq_cache = {}
sourceprimer_cache = {} # source genome primers
if seq_cache: # cached genome sequences (may be reused)
caches = EACaches(seq_cache, {})
else:
caches = EACaches({}, {})
amplicons = PDPAmpliconCollection(name)

stem = primer.name.split("_primer_")[0]
source_data = namedict[primer.sourcename] # the source sequence for the primers
seq_cache[source_data.name] = SeqIO.read(source_data.seqfile, "fasta")
# Get primersearch output for each primer as we encounter it
populate_amplicon_collection(
EAData(amplicons, primer, limits), caches, indexed_data
)

# Cache the source genome primer information
if stem not in sourceprimer_cache:
sourceprimer_cache[stem] = {
_.name: _ for _ in load_primers(source_data.primers, fmt="json")
}
return amplicons, seq_cache


def populate_amplicon_collection(eadata, caches, indexed_data):
"""Populate amplicon collection in eadata with single primer results
:param eadata: EAData namedtuple
holds amplicon collection, primer, and amplicon size limits
:param caches: EACaches namedtuple
holds caches of target genome sequences and PrimerSearch output
:param indexed_data:
Identifies source genome for primer sequence and adds to cache if not already
present. For each target genome amplified by the primer set, identifies
corresponding PrimerSearch results, and caches them in-memory (psoutput_cache).
Cache genome sequence for target (if not already cached), and if this primer
is found in the results, add the corresponding amplicon to the collection.
"""
# Add genome from which primer set derives to seq_cache, if it's not already
# present
source_data = indexed_data.namedict[eadata.primer.sourcename]
if source_data.name not in caches.seq_cache.keys():
caches.seq_cache[source_data.name] = SeqIO.read(source_data.seqfile, "fasta")

# Get primersearch output for each primer as we encounter it
with open(source_data.primersearch, "r") as ifh:
psdata = json.load(ifh)
targets = [_ for _ in psdata.keys() if _ not in ("primers", "query")]
Expand All @@ -258,45 +290,66 @@ def extract_amplicons(name, primer, pdpcoll, limits, seq_cache=None):
for target in targets:

# Cache primersearch output for the target
if psdata[target] not in psoutput_cache:
psoutput_cache[psdata[target]] = {
_.name: _ for _ in parse_output(psdata[target], genomepaths[target])
if psdata[target] not in caches.psoutput_cache:
caches.psoutput_cache[psdata[target]] = {
_.name: _
for _ in parse_output(
psdata[target], indexed_data.genomepaths[target]
)
}
# TODO: turn output from parse_output into an indexable object,
# so we can use primer names to get results, rather than
# hacking that, as we do above.

# Cache genome data for the target (read each file once, saves time)
if target not in seq_cache:
seq_cache[target] = SeqIO.read(namedict[target].seqfile, "fasta")
target_genome = seq_cache[target]

if target not in caches.seq_cache:
caches.seq_cache[target] = SeqIO.read(
indexed_data.namedict[target].seqfile, "fasta"
)
# psresult holds the primersearch result - we create an
# amplicon for each amplimer in the psresult
# If this primer isn't in the set that amplifies the target,
# continue to the next target
if primer.name not in psoutput_cache[psdata[target]]:
continue
psresult = psoutput_cache[psdata[target]][primer.name]
for ampidx, amplimer in enumerate(psresult.amplimers):
coords = (amplimer.fwd.start, amplimer.rev.end)
# Extract the genome sequence
# We have to account here for forward/reverse primer
# amplification wrt target genome sequence. We want
# all the amplimer sequences to be identically-stranded
# for downstream alignments so, if the forward/reverse
# primer sequences don't match between the primer sets and
# the PrimerSearch results, we flip the sequence here.
seq = target_genome[min(coords) - 1 : max(coords)]
if primer.forward_seq != amplimer.fwd.seq:
seq = seq.reverse_complement()
if limits[0] < len(seq) < limits[1]:
amplicons.new_amplicon(
"_".join([primer.name, target, str(ampidx + 1)]),
PSResultAmplimer(psresult, primer, amplimer, seq),
)

return amplicons, seq_cache
if eadata.primer.name in caches.psoutput_cache[psdata[target]]:
psresult = caches.psoutput_cache[psdata[target]][eadata.primer.name]
# Add amplicons for this primer to the current collection
add_amplicons(eadata, target, psresult, caches)


def add_amplicons(eadata, target, psresult, caches):
"""Add amplicons in place to a collection in passed EAData namedtuple
:param eadata: EAData namedtuple
holds amplicon collection, primer, and amplicon size limits
:param target: target genome for amplication by primer
:param psresult: PrimerSearch result
describes amplication of target genome by primer
:param caches: EACaches namedtuple
holds caches of target genome sequences and PrimerSearch output
Loops over all amplimers in the psresult and, if the amplimer meets
size limit criteria, adds a new amplicon (in-place) to the collection
held in eadata
"""
target_genome = caches.seq_cache[target]
for ampidx, amplimer in enumerate(psresult.amplimers):
print(ampidx, amplimer)
coords = (amplimer.fwd.start, amplimer.rev.end)
# Extract the genome sequence
# We have to account here for forward/reverse primer
# amplification wrt target genome sequence. We want
# all the amplimer sequences to be identically-stranded
# for downstream alignments so, if the forward/reverse
# primer sequences don't match between the primer sets and
# the PrimerSearch results, we flip the sequence here.
seq = target_genome[min(coords) - 1 : max(coords)]
if eadata.primer.forward_seq != amplimer.fwd.seq:
seq = seq.reverse_complement()
if eadata.limits[0] < len(seq) < eadata.limits[1]:
eadata.amplicons.new_amplicon(
"_".join([eadata.primer.name, target, str(ampidx + 1)]),
PSResultAmplimer(psresult, eadata.primer, amplimer, seq),
)


def calculate_distance(aln, calculator="identity"):
Expand Down

0 comments on commit f73226e

Please sign in to comment.