Skip to content

Commit

Permalink
Merge pull request #25 from chungongyu/dev-c++11
Browse files Browse the repository at this point in the history
refactor: add .gz support for fasta&fastq file format
  • Loading branch information
chungongyu committed Jul 12, 2020
2 parents c8e30fa + 05b79c3 commit b3c36e0
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 112 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ libsiga_la_SOURCES=\
suffix_array.h \
suffix_array_builder.cpp \
suffix_array_builder.h \
utils.cpp \
utils.h

bin_PROGRAMS=siga
Expand Down
31 changes: 0 additions & 31 deletions src/asqg.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
#include "asqg.h"
#include "constant.h"
#include "utils.h"

#include <cassert>

#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/filter/gzip.hpp>

#include <log4cxx/logger.h>

Expand Down Expand Up @@ -234,30 +229,4 @@ namespace ASQG {
}
return RT_NONE;
}

std::streambuf* ifstreambuf(const std::string& filename) {
boost::iostreams::filtering_istreambuf* buf = new boost::iostreams::filtering_istreambuf();
try {
if (boost::algorithm::ends_with(filename, GZIP_EXT)) {
buf->push(boost::iostreams::gzip_decompressor());
}
buf->push(boost::iostreams::file_descriptor_source(filename));
} catch (...) {
SAFE_DELETE(buf);
}
return buf;
}

std::streambuf* ofstreambuf(const std::string& filename) {
boost::iostreams::filtering_ostreambuf* buf = new boost::iostreams::filtering_ostreambuf();
try {
if (boost::algorithm::ends_with(filename, GZIP_EXT)) {
buf->push(boost::iostreams::gzip_compressor());
}
buf->push(boost::iostreams::file_descriptor_sink(filename));
} catch (...) {
SAFE_DELETE(buf);
}
return buf;
}
};
2 changes: 0 additions & 2 deletions src/asqg.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ namespace ASQG {
};

RecordType recordtype(const std::string& record);
std::streambuf* ifstreambuf(const std::string& filename);
std::streambuf* ofstreambuf(const std::string& filename);
};

#endif // asqg_h_
14 changes: 6 additions & 8 deletions src/bigraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,9 @@ bool Bigraph::load(std::istream& stream, size_t minOverlap, bool allowContainmen
}

bool Bigraph::load(const std::string& filename, size_t minOverlap, bool allowContainments, size_t maxEdges, Bigraph* g) {
std::shared_ptr<std::streambuf> buf(ASQG::ifstreambuf(filename));
if (buf) {
std::istream stream(buf.get());
return load(stream, minOverlap, allowContainments, maxEdges, g);
std::shared_ptr<std::istream> stream(Utils::ifstream(filename));
if (stream) {
return load(*stream, minOverlap, allowContainments, maxEdges, g);
}
return false;
}
Expand Down Expand Up @@ -602,10 +601,9 @@ bool Bigraph::save(std::ostream& stream, const Bigraph* g) {
}

bool Bigraph::save(const std::string& filename, const Bigraph* g) {
std::shared_ptr<std::streambuf> buf(ASQG::ofstreambuf(filename));
if (buf) {
std::ostream stream(buf.get());
return save(stream, g);
std::shared_ptr<std::ostream> stream(Utils::ofstream(filename));
if (stream) {
return save(*stream, g);
}
return false;
}
Expand Down
16 changes: 12 additions & 4 deletions src/correct_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,21 @@ bool CorrectProcessor::process(const FMIndex& index, DNASeqReader& reader, std::
bool CorrectProcessor::process(const FMIndex& index, const std::string& input, const std::string& output, size_t threads, size_t* processed) const {

// DNASeqReader
std::ifstream reads(input);
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(reads));
std::shared_ptr<std::istream> reads(Utils::ifstream(input));
if (!reads) {
LOG4CXX_ERROR(logger, boost::format("Failed to read file %s") % input);
return false;
}
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(*reads));
if (!reader) {
LOG4CXX_ERROR(logger, boost::format("Failed to create DNASeqReader %s") % input);
return false;
}

std::ofstream out(output);
return process(index, *reader, out, threads, processed);
std::shared_ptr<std::ostream> out(Utils::ofstream(output));
if (!out) {
LOG4CXX_ERROR(logger, boost::format("Failed to create DNASeqWriter %s") % output);
return false;
}
return process(index, *reader, *out, threads, processed);
}
14 changes: 9 additions & 5 deletions src/kseq.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "kseq.h"
#include "utils.h"

#include <fstream>
#include <map>
Expand Down Expand Up @@ -95,13 +96,13 @@ std::ostream& operator << (std::ostream& os, const DNASeq& seq) {
return os;
}

DNASeqReader* DNASeqReaderFactory::create(std::istream& stream, const std::map<std::string, std::string>* attrs) {
DNASeqReader* DNASeqReaderFactory::create(std::istream& stream) {
if (stream) {
int c = stream.peek();
if (c == '@') {
return new FASTQReader(stream, attrs);
return new FASTQReader(stream);
} else if (c == '>') {
return new FASTAReader(stream, attrs);
return new FASTAReader(stream);
}
}
return NULL;
Expand Down Expand Up @@ -218,8 +219,11 @@ bool ReadDNASequences(std::istream& stream, DNASeqList& sequences) {
}

bool ReadDNASequences(const std::string& file, DNASeqList& sequences) {
std::ifstream stream(file.c_str());
return ReadDNASequences(stream, sequences);
std::shared_ptr<std::istream> stream(Utils::ifstream(file));
if (stream) {
return ReadDNASequences(*stream, sequences);
}
return false;
}

bool ReadDNASequences(const std::vector<std::string>& filelist, DNASeqList& sequences) {
Expand Down
16 changes: 8 additions & 8 deletions src/kseq.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ bool ReadDNASequences(const std::vector<std::string>& filelist, DNASeqList& sequ

class DNASeqReader {
public:
DNASeqReader(std::istream& stream, const std::map<std::string, std::string>* attrs=NULL) : _stream(stream) {
_pos = stream.tellg();
if (attrs != NULL) {
_attrs = *attrs;
}
DNASeqReader(std::istream& stream) : _stream(stream) {
_pos = 0;//_stream.tellg();
}
virtual ~DNASeqReader() {
}
Expand All @@ -78,6 +75,9 @@ class DNASeqReader {
}
return defval;
}
void setAttr(const std::string& key, const std::string& val) {
_attrs[key] = val;
}
protected:
std::istream& _stream;
size_t _pos;
Expand All @@ -86,7 +86,7 @@ class DNASeqReader {

class DNASeqReaderFactory {
public:
static DNASeqReader* create(std::istream& stream, const std::map<std::string, std::string>* attrs=NULL);
static DNASeqReader* create(std::istream& stream);
};

//
Expand All @@ -113,7 +113,7 @@ class DNASeqReaderFactory {
//
class FASTQReader : public DNASeqReader {
public:
FASTQReader(std::istream& stream, const std::map<std::string, std::string>* attrs=NULL) : DNASeqReader(stream, attrs) {
FASTQReader(std::istream& stream) : DNASeqReader(stream) {
}

bool read(DNASeq& sequence);
Expand All @@ -129,7 +129,7 @@ class FASTQReader : public DNASeqReader {
//
class FASTAReader : public DNASeqReader {
public:
FASTAReader(std::istream& stream, const std::map<std::string, std::string>* attrs=NULL) : DNASeqReader(stream, attrs) {
FASTAReader(std::istream& stream) : DNASeqReader(stream) {
}

void reset() {
Expand Down
23 changes: 16 additions & 7 deletions src/match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "fmindex.h"
#include "kseq.h"
#include "runner.h"
#include "utils.h"

#include <fstream>
#include <iostream>
Expand Down Expand Up @@ -36,13 +37,21 @@ class Match : public Runner {
FMIndex fmi;
if (FMIndex::load(prefix + BWT_EXT, fmi)) {
for (const auto& input : arguments) {
std::ifstream stream(input.c_str());
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(stream));
if (reader) {
DNASeq read;
while (reader->read(read)) {
std::cout << boost::format("%s\t%s\t%d\n") % read.name % read.seq % (FMIndex::Interval::occurrences(read.seq, &fmi) + FMIndex::Interval::occurrences(make_reverse_complement_dna_copy(read.seq), &fmi));
}
std::shared_ptr<std::istream> stream(Utils::ifstream(input));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("Failed to read file %s") % input);
r = -1;
break;
}
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(*stream));
if (!reader) {
LOG4CXX_ERROR(logger, boost::format("Failed to create DNASeqReader %s") % input);
r = -1;
break;
}
DNASeq read;
while (reader->read(read)) {
std::cout << boost::format("%s\t%s\t%d\n") % read.name % read.seq % (FMIndex::Interval::occurrences(read.seq, &fmi) + FMIndex::Interval::occurrences(make_reverse_complement_dna_copy(read.seq), &fmi));
}
}
} else {
Expand Down
60 changes: 27 additions & 33 deletions src/overlap_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,12 @@ class Hits2ASQGConverter {
}

bool convert(const std::string& hits, std::ostream& asqg) const {
std::shared_ptr<std::streambuf> buf(ASQG::ifstreambuf(hits));
if (!buf) {
std::shared_ptr<std::istream> stream(Utils::ifstream(hits));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to read hits %s") % hits);
return false;
}
std::istream stream(buf.get());
return convert(stream, asqg);
return convert(*stream, asqg);
}

bool convert(std::istream& hits, std::ostream& asqg) const {
Expand Down Expand Up @@ -399,13 +398,12 @@ bool OverlapBuilder::build(DNASeqReader& reader, size_t minOverlap, std::ostream

if (threads <= 1) { // single thread
std::string hit = _prefix + HITS_EXT + GZIP_EXT;
std::shared_ptr<std::streambuf> buf(ASQG::ofstreambuf(hit));
if (!buf) {
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
std::ostream stream(buf.get());
OverlapProcess proc(this, minOverlap, stream);
OverlapProcess proc(this, minOverlap, *stream);
OverlapPostProcess postproc(output);

SequenceProcessFramework::SerialWorker<
Expand All @@ -423,20 +421,17 @@ bool OverlapBuilder::build(DNASeqReader& reader, size_t minOverlap, std::ostream
hits.push_back(hit);
} else { // multi thread
#ifdef _OPENMP
std::vector<std::shared_ptr<std::streambuf> > buflist(threads);
std::vector<std::shared_ptr<std::ostream> > streamlist(threads);
std::vector<OverlapProcess *> proclist(threads);
for (size_t i = 0; i < threads; ++i) {
std::string hit = boost::str(boost::format("%s-thread%d%s%s") % _prefix % i % HITS_EXT % GZIP_EXT);
std::shared_ptr<std::streambuf> buf(ASQG::ofstreambuf(hit));
if (!buf) {
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
std::shared_ptr<std::ostream> stream(new std::ostream(buf.get()));
proclist[i] = new OverlapProcess(this, minOverlap, *stream);
streamlist[i] = stream;
buflist[i] = buf;
hits.push_back(hit);
}
OverlapPostProcess postproc(output);
Expand Down Expand Up @@ -484,26 +479,27 @@ bool OverlapBuilder::build(DNASeqReader& reader, size_t minOverlap, std::ostream

bool OverlapBuilder::build(const std::string& input, size_t minOverlap, const std::string& output, size_t threads, size_t batch, size_t* processed) const {
// DNASeqReader
std::ifstream reads(input.c_str());
std::map<std::string, std::string> attrs = {
{"infile", input}
};
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(reads, &attrs));
std::shared_ptr<std::istream> reads(Utils::ifstream(input));
if (!reads) {
LOG4CXX_ERROR(logger, boost::format("Failed to read file %s") % input);
return false;
}
std::shared_ptr<DNASeqReader> reader(DNASeqReaderFactory::create(*reads));
if (!reader) {
reader->setAttr("infile", input);
LOG4CXX_ERROR(logger, boost::format("Failed to create DNASeqReader %s") % input);
return false;
}

// ASQG
std::shared_ptr<std::streambuf> buf(ASQG::ofstreambuf(output));
if (!buf) {
std::shared_ptr<std::ostream> asqg(Utils::ofstream(output));
if (!asqg) {
LOG4CXX_ERROR(logger, boost::format("Failed to create ASQG %s") % output);
return false;
}
std::ostream asqg(buf.get());

// Build
return build(*reader, minOverlap, asqg, threads, batch, processed);
return build(*reader, minOverlap, *asqg, threads, batch, processed);
}

//
Expand Down Expand Up @@ -546,13 +542,12 @@ class Hits2FastaConverter {
}

bool convert(const std::string& hits, std::ostream& fasta, std::ostream& duplicates) const {
std::shared_ptr<std::streambuf> buf(ASQG::ifstreambuf(hits));
if (!buf) {
std::shared_ptr<std::istream> stream(Utils::ifstream(hits));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to read hits %s") % hits);
return false;
}
std::istream stream(buf.get());
return convert(stream, fasta, duplicates);
return convert(*stream, fasta, duplicates);
}

bool convert(std::istream& hits, std::ostream& fasta, std::ostream& duplicates) const {
Expand Down Expand Up @@ -607,13 +602,12 @@ bool OverlapBuilder::rmdup(DNASeqReader& reader, std::ostream& output, std::ostr

if (threads <= 1) { // single thread
std::string hit = _prefix + RMDUP_EXT + HITS_EXT + GZIP_EXT;
std::shared_ptr<std::streambuf> buf(ASQG::ofstreambuf(hit));
if (!buf) {
std::shared_ptr<std::ostream> stream(Utils::ofstream(hit));
if (!stream) {
LOG4CXX_ERROR(logger, boost::format("failed to create hits %s") % hit);
return false;
}
std::ostream stream(buf.get());
DuplicateRemoveProcess proc(this, stream);
DuplicateRemoveProcess proc(this, *stream);
DuplicateRemovePostProcess postproc;

SequenceProcessFramework::SerialWorker<
Expand Down Expand Up @@ -664,21 +658,21 @@ bool OverlapBuilder::rmdup(const std::string& input, const std::string& output,
}

// FASTA
std::ofstream fasta(output.c_str());
std::shared_ptr<std::ostream> fasta(Utils::ofstream(output));
if (!fasta) {
LOG4CXX_ERROR(logger, boost::format("Failed to create FASTA %s") % output);
return false;
}

// DUPLICATES
std::ofstream dup(duplicates.c_str());
std::shared_ptr<std::ostream> dup(Utils::ofstream(duplicates));
if (!fasta) {
LOG4CXX_ERROR(logger, boost::format("Failed to create dup FASTA %s") % duplicates);
return false;
}

// Build
return rmdup(*reader, fasta, dup, threads, processed);
return rmdup(*reader, *fasta, *dup, threads, processed);
}

class IrreducibleBlockListExtractor {
Expand Down
Loading

0 comments on commit b3c36e0

Please sign in to comment.