Skip to content

Commit

Permalink
feat: HiFi reads
Browse files Browse the repository at this point in the history
  • Loading branch information
chaunceyyu authored and chungongyu committed May 10, 2021
1 parent a1079eb commit 2670270
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 110 deletions.
30 changes: 17 additions & 13 deletions src/bigraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,22 @@ Vertex::Vertex(const Id& id, const std::string& seq, bool contained, const std::
c = std::stoi(item.substr(k + 1));
}
}
auto it = _indexTbl.find(barcode);
if (it != _indexTbl.end()) {
it->second += c;
} else {
_indexTbl[barcode] = c;
}
_indexTbl.insert(std::make_pair(barcode, c));
// auto it = _indexTbl.find(barcode);
// if (it != _indexTbl.end()) {
// it->second += c;
// } else {
// _indexTbl[barcode] = c;
// }
}
}
{
std::vector<std::string> vec;
ASQG::tokenize(vec, ext, ',');
for (const auto& item : vec) {
_ext.push_back(item);
if (!item.empty()) {
_ext.push_back(item);
}
}
}
}
Expand Down Expand Up @@ -149,12 +152,13 @@ void Vertex::merge(Edge* edge) {

// Update the index/barcode table of vertex
for (const auto& idx : edge->end()->_indexTbl) {
auto it = _indexTbl.find(idx.first);
if (it != _indexTbl.end()) {
it->second += idx.second;
} else {
_indexTbl[idx.first] = idx.second;
}
_indexTbl.insert(std::make_pair(idx.first, idx.second));
// auto it = _indexTbl.find(idx.first);
// if (it != _indexTbl.end()) {
// it->second += idx.second;
// } else {
// _indexTbl[idx.first] = idx.second;
// }
}

// Update the extension attributes
Expand Down
2 changes: 1 addition & 1 deletion src/bigraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ typedef std::vector<Edge *> EdgePtrList;
class Vertex {
public:
typedef std::string Id;
typedef std::unordered_map<std::string, size_t> IndexTable;
typedef std::unordered_multimap<std::string, size_t> IndexTable;
typedef std::vector<std::string> ExtList;

Vertex(const Id& id, const std::string& seq, bool contained=false, const std::string& index="", size_t coverage=1, const std::string& ext="");
Expand Down
74 changes: 46 additions & 28 deletions src/overlap_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,12 @@ std::istream& operator>>(std::istream& stream, Hit& hit) {
//
class OverlapProcess {
public:
OverlapProcess(const OverlapBuilder* builder, size_t minOverlap, std::ostream& stream)
: _builder(builder), _minOverlap(minOverlap), _stream(stream) {
OverlapProcess(const OverlapBuilder* builder, size_t minOverlap, const std::string& hit)
: _builder(builder), _minOverlap(minOverlap), _stream(Utils::ofstream(hit)) {
if (!_stream) {
LOG4CXX_FATAL(logger, boost::format("failed to create hits %s") % hit);
assert(false);
}
}

OverlapResult process(const SequenceProcessFramework::SequenceWorkItem& workItem) {
Expand All @@ -270,15 +274,15 @@ class OverlapProcess {
//
// Write overlap blocks out to a file
//
_stream << hit << '\n';
*_stream << hit << '\n';

return result;
}

private:
const OverlapBuilder* _builder;
size_t _minOverlap;
std::ostream& _stream;
std::shared_ptr<std::ostream> _stream;
};

//
Expand Down Expand Up @@ -422,12 +426,7 @@ bool OverlapBuilder::build(DNASeqReader& reader, size_t minOverlap, std::ostream

if (threads <= 1) { // single thread
std::string hit = _prefix + HITS_EXT + GZIP_EXT;
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
OverlapProcess proc(this, minOverlap, *stream);
OverlapProcess proc(this, minOverlap, hit);
OverlapPostProcess postproc(output);

SequenceProcessFramework::SerialWorker<
Expand All @@ -445,17 +444,10 @@ bool OverlapBuilder::build(DNASeqReader& reader, size_t minOverlap, std::ostream
hits.push_back(hit);
} else { // multi thread
#ifdef _OPENMP
std::vector<std::shared_ptr<std::ostream> > streamlist(threads);
std::vector<OverlapProcess *> proclist(threads);
for (size_t i = 0; i < threads; ++i) {
std::string hit = boost::str(boost::format("%s-thread%d%s%s") % _prefix % i % HITS_EXT % GZIP_EXT);
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
proclist[i] = new OverlapProcess(this, minOverlap, *stream);
streamlist[i] = stream;
proclist[i] = new OverlapProcess(this, minOverlap, hit);
hits.push_back(hit);
}
OverlapPostProcess postproc(output);
Expand Down Expand Up @@ -532,21 +524,26 @@ bool OverlapBuilder::build(const std::string& input, size_t minOverlap, const st
//
class DuplicateRemoveProcess {
public:
DuplicateRemoveProcess(const OverlapBuilder* builder, std::ostream& stream) : _builder(builder), _stream(stream) {
DuplicateRemoveProcess(const OverlapBuilder* builder, const std::string& hit)
: _builder(builder), _stream(Utils::ofstream(hit)) {
if (!_stream) {
LOG4CXX_FATAL(logger, boost::format("failed to create hits %s") % hit);
assert(false);
}
}

OverlapResult process(const SequenceProcessFramework::SequenceWorkItem& workItem) {
Hit hit(workItem.idx);
OverlapResult result = _builder->duplicate(workItem.read, &hit.blocks);
hit.substring = result.substring;
_stream << workItem.read.name << '\t' << workItem.read.seq << '\t' << hit << '\n';
*_stream << workItem.read.name << '\t' << workItem.read.seq << '\t' << hit << '\n';

return result;
}

private:
const OverlapBuilder* _builder;
std::ostream& _stream;
std::shared_ptr<std::ostream> _stream;
};

//
Expand Down Expand Up @@ -628,12 +625,7 @@ bool OverlapBuilder::rmdup(DNASeqReader& reader, std::ostream& output, std::ostr

if (threads <= 1) { // single thread
std::string hit = _prefix + RMDUP_EXT + HITS_EXT + GZIP_EXT;
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
DuplicateRemoveProcess proc(this, *stream);
DuplicateRemoveProcess proc(this, hit);
DuplicateRemovePostProcess postproc;

SequenceProcessFramework::SerialWorker<
Expand All @@ -650,7 +642,33 @@ bool OverlapBuilder::rmdup(DNASeqReader& reader, std::ostream& output, std::ostr

hits.push_back(hit);
} else { // multi thread
assert(false);
#ifdef _OPENMP
std::vector<DuplicateRemoveProcess *> proclist(threads);
for (size_t i = 0; i < threads; ++i) {
std::string hit = boost::str(boost::format("%s-thread%d%s%s") % _prefix % i % HITS_EXT % GZIP_EXT);
proclist[i] = new DuplicateRemoveProcess(this, hit);
hits.push_back(hit);
}
DuplicateRemovePostProcess postproc;

SequenceProcessFramework::ParallelWorker<
SequenceProcessFramework::SequenceWorkItem,
OverlapResult,
SequenceProcessFramework::SequenceWorkItemGenerator<SequenceProcessFramework::SequenceWorkItem>,
DuplicateRemoveProcess,
DuplicateRemovePostProcess
> worker;
size_t num = worker.run(reader, &proclist, &postproc);
if (processed != NULL) {
*processed = num;
}
for (size_t i = 0; i < threads; ++i) {
delete proclist[i];
}
#else
LOG4CXX_ERROR(logger, "failed to load OpenMP");
return false;
#endif // _OPENMP
}

// Convert hits to fasta
Expand Down
4 changes: 3 additions & 1 deletion src/preprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class Preprocess : public Runner {
if (stats.numBasesRead) {
LOG4CXX_INFO(logger, boost::format("Bases kept:\t%d(%f)") % stats.numBasesKept % ((double)stats.numBasesKept / stats.numBasesRead));
}
LOG4CXX_INFO(logger, boost::format("Number of incorrectly paired reads that were discarded: %d") % stats.numInvalidPE);
if (options.get<int>("pe-mode", 0) == 1) {
LOG4CXX_INFO(logger, boost::format("Number of incorrectly paired reads that were discarded: %d") % stats.numInvalidPE);
}
}
} else {
LOG4CXX_ERROR(logger, "Failed to open output stream");
Expand Down
3 changes: 2 additions & 1 deletion src/rmdup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class DuplicateRemove : public Runner {
FMIndex fmi, rfmi;
if (FMIndex::load(output + BWT_EXT, fmi) && FMIndex::load(output + RBWT_EXT, rfmi)) {
OverlapBuilder builder(&fmi, &rfmi, output);
if (!builder.rmdup(input, output + RMDUP_EXT + ".fa", output + RMDUP_EXT + ".dups.fa")) {
if (!builder.rmdup(input, output + RMDUP_EXT + ".fa", output + RMDUP_EXT + ".dups.fa",
options.get<size_t>("threads", 1))) {
LOG4CXX_ERROR(logger, boost::format("Failed to remove duplicates from reads %s") % input);
r = -1;
}
Expand Down
5 changes: 3 additions & 2 deletions src/suffix_array.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef suffix_array_h_
#define suffix_array_h_

#include <cstdint>
#include <iostream>
#include <string>
#include <vector>
Expand All @@ -23,8 +24,8 @@ class SuffixArray {
operator bool() const {
return !empty();
}
size_t i;
size_t j;
uint32_t i;
uint32_t j;
};
typedef std::vector<Elem> ElemList;

Expand Down
3 changes: 2 additions & 1 deletion src/suffix_array_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class SuffixArray;

class SuffixArrayBuilder {
public:
static SuffixArrayBuilder* create(const std::string& algorithm);
virtual ~SuffixArrayBuilder() {}

static SuffixArrayBuilder* create(const std::string& algorithm);
virtual SuffixArray* build(const DNASeqList& sequences, size_t threads = 1) = 0;
};

Expand Down
61 changes: 13 additions & 48 deletions src/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "utils.h"

#include <cassert>
#include <cstdlib>
#include <ctime>
#include <memory>
Expand All @@ -27,12 +26,14 @@ int rand() {
return ::rand();
}

template <typename Alloc = std::allocator<char> >
struct basic_gzip_decompressor_delegator : boost::iostreams::basic_gzip_decompressor<Alloc> {
// HACK: a delegator to boost gzip & bzip2 decompressor in order to make it
// seekable to the beginning of the stream and then burn data.
template <typename basic_decompressor>
struct basic_decompressor_delegator {
public:
typedef char char_type;
struct category : boost::iostreams::basic_gzip_decompressor<Alloc>::category, boost::iostreams::input_seekable {};
basic_gzip_decompressor_delegator() : impl_(new boost::iostreams::basic_gzip_decompressor<Alloc>()) {
struct category : basic_decompressor::category, boost::iostreams::input_seekable {};
basic_decompressor_delegator() : impl_(new basic_decompressor()) {
}
template <typename Sink>
std::streamsize write(Sink& snk, const char_type* s, std::streamsize n) {
Expand All @@ -53,61 +54,25 @@ struct basic_gzip_decompressor_delegator : boost::iostreams::basic_gzip_decompre
template <typename Source>
std::streampos seek(Source& src, boost::iostreams::stream_offset off, BOOST_IOS::seekdir way,
BOOST_IOS::openmode which = BOOST_IOS::in | BOOST_IOS::out) {
assert(off == 0);
assert(off == 0 && way == BOOST_IOS::beg);
src.pubseekoff(off, way);
impl_.reset(new boost::iostreams::basic_gzip_decompressor<Alloc>());
impl_.reset(new basic_decompressor());
return 0;
}

std::string file_name() const { return impl_->file_name(); }
std::string comment() const { return impl_->comment(); }
bool text() const { return impl_->text(); }
int os() const { return impl_->os(); }
std::time_t mtime() const { return impl_->mtime(); }

private:
std::shared_ptr<boost::iostreams::basic_gzip_decompressor<Alloc> > impl_;
std::shared_ptr<basic_decompressor> impl_;
};

template <typename Alloc = std::allocator<char> >
struct basic_gzip_decompressor_delegator : basic_decompressor_delegator<boost::iostreams::basic_gzip_decompressor<Alloc> > {
};
BOOST_IOSTREAMS_PIPABLE(basic_gzip_decompressor_delegator, 1)
typedef basic_gzip_decompressor_delegator<> gzip_decompressor_delegator;

template <typename Alloc = std::allocator<char> >
struct basic_bzip2_decompressor_delegator : boost::iostreams::basic_bzip2_decompressor<Alloc> {
public:
typedef char char_type;
struct category : boost::iostreams::basic_bzip2_decompressor<Alloc>::category, boost::iostreams::input_seekable {};
basic_bzip2_decompressor_delegator() : impl_(new boost::iostreams::basic_bzip2_decompressor<Alloc>()) {
}
template <typename Sink>
std::streamsize write(Sink& snk, const char_type* s, std::streamsize n) {
return impl_->write(snk, s, n);
}
template <typename Source>
std::streamsize read(Source& src, char_type* s, std::streamsize n) {
return impl_->read(src, s, n);
}
template <typename Source>
void close(Source& src, BOOST_IOS::openmode m) {
impl_->close(src, m);
}
template <typename Source>
void close(Source& src) {
this->close(src, BOOST_IOS::in);
}
template <typename Source>
std::streampos seek(Source& src, boost::iostreams::stream_offset off, BOOST_IOS::seekdir way,
BOOST_IOS::openmode which = BOOST_IOS::in | BOOST_IOS::out) {
assert(off == 0);
src.pubseekoff(off, way);
impl_.reset(new boost::iostreams::basic_bzip2_decompressor<Alloc>());
return 0;
}

private:
std::shared_ptr<boost::iostreams::basic_bzip2_decompressor<Alloc> > impl_;
struct basic_bzip2_decompressor_delegator : basic_decompressor_delegator<boost::iostreams::basic_bzip2_decompressor<Alloc> > {
};

BOOST_IOSTREAMS_PIPABLE(basic_bzip2_decompressor_delegator, 1)
typedef basic_bzip2_decompressor_delegator<> bzip2_decompressor_delegator;

Expand Down
32 changes: 17 additions & 15 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,23 @@
#endif

namespace Utils {
// random
void srand();
int rand();
inline int rand(int start, int stop) {
assert(stop > start);
return rand() % (stop - start) + start;
}
inline int rand(int stop) {
return rand(0, stop);
}

// streams
std::istream* ifstream(const std::string& filename);
std::ostream* ofstream(const std::string& filename);
std::string stem(const std::string& filename);

// random
void srand();
int rand();
inline int rand(int start, int stop) {
assert(stop > start);
return rand() % (stop - start) + start;
}
inline int rand(int stop) {
return rand(0, stop);
}

// streams
std::istream* ifstream(const std::string& filename);
std::ostream* ofstream(const std::string& filename);
std::string stem(const std::string& filename);

}; // namespace Utils

#endif // utils_h_

0 comments on commit 2670270

Please sign in to comment.