-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.nf
537 lines (487 loc) · 20.1 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
//import processes for preprocessing
include{
quality_control
quality_control_2
adapter_removal
quality_filter
} from './modules/preprocessing.nf'
//import processes for barcode handling
include{
extract_rnd_barcode
check_barcode_file
split_exp_barcode
get_length_exp_barcode
remove_exp_barcode
merge_preprocessed_reads
generate_barcode_barplot
} from './modules/barcode_handling.nf'
//import processes used for aligments
include{
build_index_bowtie
mapping_bowtie
build_index_STAR
mapping_STAR
filter_empty_bams
collect_experiments_without_alignments
} from './modules/alignment.nf'
//import processes used for deduplication
include{
sort_bam
deduplicate
merge_deduplicated_bam
} from './modules/deduplication.nf'
//import processes used for transcript analysis
if(params.map_to_transcripts == true){
include{
count_hits
get_top_hits
index_alignments
extract_top_alignments
remove_newlines
extract_top_transcript_sequences
} from './modules/transcript_handling.nf'
}
//import processes used to generate peaks
include{
get_chromosome_sizes
pureCLIP_to_wig
calculate_crosslink_sites
split_wig2_for_correlation
calc_wig_correlation
merge_wigs
wig_to_bigWig
bigWig_to_bedgraph
index_for_peak_calling
pureCLIP
split_wig_2_for_peak_height_hist
generate_peak_height_histogram
} from './modules/peak_generation.nf'
//import processes used to determine strand preference
include{
sort_bam_before_strand_pref
determine_strand_preference
visualize_strand_preference
} from './modules/strand_preference.nf'
//import processes used for RNA subtype analysis
if(params.annotation != 'NO_FILE'){
include{
wig_to_bam
feature_counts
get_RNA_subtypes_distribution
generate_RNA_subtypes_barplot
collect_subtype_analysis_errors
} from './modules/RNA_subtype_distribution.nf'
}
//import processes used for motif analysis
include{
sequence_extraction
motif_search
} from './modules/motif_analysis.nf'
//import processes used for peak distance analysis
include{
calculate_peak_distance
plot_peak_distance
} from './modules/peak_distance_analysis.nf'
//import processes used to generate the IGV session
include{
prepare_annotation_for_igv
generate_igv_session
} from './modules/generate_IGV_session.nf'
//import general processes
include{
split_bam_by_chromosome
sort_and_index_alignment
output_reference
multiqc
collect_workflow_metrics
} from './modules/general_processes.nf'
//essential input files
input_reads = Channel.fromPath( params.reads ) //FASTQ file(s) containing reads
reference = Channel.fromPath( params.reference ) //FASTA file containing reference sequence(s)
barcode_file = Channel.fromPath( params.barcodes ) //TSV file containing experiment names and the corresponding experiemental barcode sequence
//non essential input files
if(params.annotation != 'NO_FILE'){
annotation_file = Channel.fromPath( params.annotation )
}
annotation = file(params.annotation)
if(params.omit_peak_calling == false){
percentile = Channel.from(0)
} else {
percentile = Channel.from(params.percentile)
}
//preparation for RNA subtype analysis
rna_subtypes = Channel.from(params.rna_subtypes
.split(','))
workflow preprocessing {
take:
input_reads
main:
if(params.speed) {
input_reads.splitFastq(by: params.split_fastq_by, file:true ).set{input_reads}
}
//preprocessing
quality_control(input_reads)
adapter_removal(input_reads)
quality_filter(adapter_removal.out.fastq_trimmed)
quality_control_2(quality_filter.out.fastq_quality_filtered)
emit:
//data for multiqc
multiqc_quality_control = quality_control.out
multiqc_quality_control_post_preprocessing = quality_control_2.out
multiqc_adapter_removal = adapter_removal.out.report_trimming
multiqc_quality_filter = quality_filter.out.report_quality_filter
// data for downstream processes
fastq_reads_quality_filtered = quality_filter.out.fastq_quality_filtered
}
workflow barcode_handling {
take:
fastq_reads_quality_filtered
barcode_file
main:
extract_rnd_barcode(fastq_reads_quality_filtered)
check_barcode_file(barcode_file)
split_exp_barcode(extract_rnd_barcode.out.fastq_rnd_barcode_extracted
.combine(check_barcode_file.out))
get_length_exp_barcode(params.barcode_pattern)
remove_exp_barcode(split_exp_barcode.out.fastq_split_experimental_barcode
.flatten()
.filter{ it.size() > 0 }
.combine(get_length_exp_barcode.out))
remove_exp_barcode.out
.map{file -> tuple(file.name - ~/\.[\w.]+.fastq$/,file)}
.groupTuple()
.set{fastq_collect_preprocessed_to_merge}
merge_preprocessed_reads(fastq_collect_preprocessed_to_merge)
generate_barcode_barplot(split_exp_barcode.out.report_split_experimental_barcode.first()) //TODO: Instead of getting the first emitted file get the input from a specific dir and use all inputs
emit:
// reports
multiqc_rnd_barcode_extraction = extract_rnd_barcode.out.report_rnd_barcode_extraction
multiqc_exp_barcode_splitting = split_exp_barcode.out.report_split_experimental_barcode
// data for downstream processes
fastq_preprocessed_reads = merge_preprocessed_reads.out
}
workflow alignment {
take:
reference
fastq_preprocessed_reads
annotation
main:
if( params.domain == 'pro' || params.map_to_transcripts == true ){
build_index_bowtie(reference)
mapping_bowtie(build_index_bowtie.out.first(),
fastq_preprocessed_reads)
alignments = mapping_bowtie.out.bam_alignments
report_alignments = mapping_bowtie.out.report_alignments
} else if ( params.domain == 'eu' ) {
build_index_STAR(reference,
annotation)
mapping_STAR(fastq_preprocessed_reads
.combine(build_index_STAR.out))
alignments = mapping_STAR.out.bam_alignments
report_alignments = mapping_STAR.out.report_alignments
}
filter_empty_bams(alignments)
collect_experiments_without_alignments(filter_empty_bams.out.report_empty_alignments.flatten().toList())
emit:
// reports
report_empty_alignments = collect_experiments_without_alignments.out
report_alignments = report_alignments
// data for downstream processes
bam_filtered_empty_alignments = filter_empty_bams.out.bam_filtered_empty
}
workflow deduplication {
take:
bam_input
main:
sort_bam(bam_input.flatten())
deduplicate(sort_bam.out)
deduplicate.out.bam_deduplicated
.map{file -> tuple(file.name - ~/\.[\w.]+.bam$/,file)}
.groupTuple()
.set{ bam_dedup_sort_to_merge }
merge_deduplicated_bam(bam_dedup_sort_to_merge)
emit:
// reports
report_deduplication = deduplicate.out.report_deduplicated
// data for downstream processes
deduplicated_alignments = merge_deduplicated_bam.out
}
workflow transcript_analysis {
take:
bam_deduplicated
reference
main:
count_hits(bam_deduplicated)
get_top_hits(count_hits.out.flatten().toList())
index_alignments(bam_deduplicated)
extract_top_alignments(index_alignments.out
.combine(get_top_hits.out))
remove_newlines(reference)
extract_top_transcript_sequences(get_top_hits.out
.combine(remove_newlines.out))
emit:
bam_extracted_transcript_alignments = extract_top_alignments.out
fasta_top_transcripts = extract_top_transcript_sequences.out
}
workflow strand_preference {
take:
bam_collected
reference
main:
bam_sorted = sort_bam_before_strand_pref(bam_collected)
if(params.merge_replicates == true){
bam_sorted
.map{file -> tuple(file.name - ~/_rep_\d*(_filtered_top)?\d*(.sorted)?.bam$/,file)}
.groupTuple()
.set{grouped_bam_to_strand_preference}
} else {
bam_sorted
.map{file -> tuple(file.name - ~/(_filtered_top\d*)?(.sorted)?.bam$/,file)}
.set{grouped_bam_to_strand_preference}
}
determine_strand_preference(grouped_bam_to_strand_preference
.combine(reference))
visualize_strand_preference(determine_strand_preference.out)
}
workflow peak_generation {
take:
bam_collected
reference
main:
get_chromosome_sizes(reference)
//calculate raw cross-link sites
calculate_crosslink_sites(bam_collected
.combine(get_chromosome_sizes.out))
collect_cl_sites_to_transform = calculate_crosslink_sites.out.wig_cross_link_sites_split_forward
.concat(calculate_crosslink_sites.out.wig_cross_link_sites_split_reverse)
//Handle peak calling when it is not omitted. If pureCLIP is executed all further analyses are performed on the peaks determined by pureCLIP
if(params.omit_peak_calling == false){
index_for_peak_calling(bam_collected)
pureCLIP(index_for_peak_calling.out
.combine(reference))
pureCLIP_to_wig(pureCLIP.out.bed_crosslink_sites)
report_pureCLIP = pureCLIP.out.report_pureCLIP
bed_pureCLIP_peaks = pureCLIP.out.bed_crosslink_sites
wig2_cross_link_sites = pureCLIP_to_wig.out.wig2_peak_called_cl_sites
collect_cl_sites_to_transform = collect_cl_sites_to_transform
.concat(pureCLIP_to_wig.out.wig_peak_called_cl_sites_forward)
.concat(pureCLIP_to_wig.out.wig_peak_called_cl_sites_reverse)
} else {
report_pureCLIP = Channel.empty()
bed_pureCLIP_peaks = Channel.empty()
wig2_cross_link_sites = calculate_crosslink_sites.out.wig2_cross_link_sites
}
if(params.merge_replicates == true){
//group replicates according to their experiment and merge them
wig2_cross_link_sites
.map{file -> tuple(file.name - ~/_rep_\d*(_filtered_top)?\d*.wig2$/,file)}
.groupTuple()
.set{wig2_grouped_samples}
merge_wigs(wig2_grouped_samples)
if(params.correlation_analysis == true){
split_wig2_for_correlation(wig2_cross_link_sites)
// combines replicates for correlation analysis. if both strands are to be checked together they are combined
// if the strands are to be checked separately they are only the according strands are combined.
// Code for grouping goes until the next comment
if(params.combine_strands_correlation == true){
value_both_strands = Channel.from("both_strands")
split_wig2_for_correlation.out.wig_split_both_strands
.flatten()
.map{file -> tuple(file.name - ~/_rep_\d*(_filtered_top)?\d*_(forward|reverse)?.wig$/,file)}
.groupTuple()
.combine(value_both_strands)
.combine(get_chromosome_sizes.out)
.set{prepared_input_correlation}
} else {
value_forward = Channel.from("forward")
split_wig2_for_correlation.out.wig_split_forward
.map{file -> tuple(file.name - ~/_rep_\d*(_filtered_top)?_forward.wig$/,file)}
.groupTuple()
.combine(value_forward)
.set{group_forward}
value_reverse = Channel.from("reverse")
split_wig2_for_correlation.out.wig_split_reverse
.map{file -> tuple(file.name - ~/_rep_\d*(_filtered_top)?_reverse.wig$/,file)}
.groupTuple()
.combine(value_reverse)
.set{group_reverse}
group_forward
.concat(group_reverse)
.combine(get_chromosome_sizes.out)
.set{prepared_input_correlation}
}
// actual calculation of correlation
calc_wig_correlation(prepared_input_correlation)
}
// Generates output channels
merge_wigs.out.wig2_merged
.set{wig2_cross_link_sites_collected}
collect_cl_sites_to_transform = collect_cl_sites_to_transform
.concat(merge_wigs.out.wig_merged_cross_link_sites_forward)
.concat(merge_wigs.out.wig_merged_cross_link_sites_reverse)
} else {
wig2_cross_link_sites
.set{wig2_cross_link_sites_collected}
}
//Transformation of peaks into different formats (bigWig and bedgraph)
wig_to_bigWig(collect_cl_sites_to_transform
.combine(get_chromosome_sizes.out))
bigWig_to_bedgraph(wig_to_bigWig.out.bigWig)
// Get correct bigWigs to display via IGV
if(params.merge_replicates == true){
wig_to_bigWig.out.bigWig
.filter{ it[0] == 'cross-link-sites-merged' }
.set{collect_bigWig_to_IGV}
} else if(params.omit_peak_calling == false){
wig_to_bigWig.out.bigWig
.filter{ it[0] == 'cross-link-sites-peak-called' }
.set{collect_bigWig_to_IGV}
} else {
wig_to_bigWig.out.bigWig
.filter{ it[0] == 'cross-link-sites-raw' }
.set{collect_bigWig_to_IGV}
}
//Generate peak height histogram
split_wig_2_for_peak_height_hist(wig2_cross_link_sites_collected)
split_wig_2_for_peak_height_hist.out.wig_split_forward
.mix(split_wig_2_for_peak_height_hist.out.wig_split_reverse)
.groupTuple()
.set{combined_wig_for_peak_height_histogram}
generate_peak_height_histogram(combined_wig_for_peak_height_histogram
.combine(percentile))
emit:
// reports
report_pureCLIP = report_pureCLIP
// data for downstream analysis
wig2_collected = wig2_cross_link_sites_collected
bigWig_collected = collect_bigWig_to_IGV
bed_pureCLIP_peaks = bed_pureCLIP_peaks
}
workflow rna_subtype_analysis {
take:
wig2_cross_link_sites
rna_subtypes
annotation
main:
wig_to_bam(wig2_cross_link_sites)
feature_counts(wig_to_bam.out
.combine(rna_subtypes)
.combine(annotation))
feature_counts.out
.map{file -> tuple(file.name - ~/\.[\w.]+.tsv$/,file)}
.groupTuple()
.set{tsv_sort_to_calculate_distribution}
get_RNA_subtypes_distribution(rna_subtypes.collect(),
tsv_sort_to_calculate_distribution)
generate_RNA_subtypes_barplot(get_RNA_subtypes_distribution.out.tsv_subtype_distribution)
collect_subtype_analysis_errors(get_RNA_subtypes_distribution.out.report_errors)
}
workflow motif_analysis {
take:
peaks
reference
main:
sequence_extraction(peaks
.combine(reference)
.combine(percentile))
if((2*params.seq_len)+1 >= params.min_motif_width){
motif_search(sequence_extraction.out.extracted_sequences)
}
}
workflow peak_distance_analysis {
take:
wig_peaks
main:
calculate_peak_distance(wig_peaks
.combine(percentile))
plot_peak_distance(calculate_peak_distance.out)
}
workflow igv_session {
take:
bigWig_tracks
bam_alignments
path_track_directory
reference
annotation
main:
if(params.annotation != 'NO_FILE'){
prepare_annotation_for_igv(annotation)
collect_annotation = prepare_annotation_for_igv.out.name_annotation
} else {
collect_annotation = annotation
}
generate_igv_session(bigWig_tracks.flatten().toList(),
bam_alignments.flatten().toList(),
path_track_directory,
reference,
collect_annotation)
}
params.version = false
if(params.version){
println(workflow.manifest.version)
} else {
workflow {
preprocessing(input_reads)
barcode_handling(preprocessing.out.fastq_reads_quality_filtered,barcode_file)
alignment(reference,barcode_handling.out.fastq_preprocessed_reads,annotation)
if(params.map_to_transcripts == false && params.speed){
bam_split_alignments = split_bam_by_chromosome(alignment.out.bam_filtered_empty_alignments)
} else { bam_split_alignments = alignment.out.bam_filtered_empty_alignments }
deduplication(bam_split_alignments)
if(params.map_to_transcripts == true){
transcript_analysis(deduplication.out.deduplicated_alignments,reference)
bam_collected = transcript_analysis.out.bam_extracted_transcript_alignments
reference_for_downstream = transcript_analysis.out.fasta_top_transcripts
} else {
bam_collected = deduplication.out.deduplicated_alignments
reference_for_downstream = reference
}
strand_preference(bam_collected,reference_for_downstream)
peak_generation(bam_collected,reference)
if(params.annotation != 'NO_FILE'){
rna_subtype_analysis(peak_generation.out.wig2_collected, rna_subtypes, annotation_file)
}
if(params.omit_sequence_extraction == false){
motif_analysis(peak_generation.out.wig2_collected,reference)
}
if(params.omit_peak_distance == false){
peak_distance_analysis(peak_generation.out.wig2_collected)
}
sort_and_index_alignment(deduplication.out.deduplicated_alignments)
if (params.merge_replicates == true){
track_path_dir = Channel.value('cross-link-sites-merged/bigWig')
peak_generation.out.bigWig_collected
.filter{it[0] == 'cross-link-sites-merged'}
.map{it[1]}.set{collected_bigWig}
} else {
track_path_dir = Channel.value('cross-link-sites/bigWig')
peak_generation.out.bigWig_collected
.map{it[1]}.set{collected_bigWig}
}
igv_session(collected_bigWig.flatten().toList(),
sort_and_index_alignment.out.bam_sorted_alignments.flatten().toList(),
track_path_dir,
reference,
annotation)
output_reference(reference)
command_line = Channel.from(workflow.commandLine)
collect_workflow_metrics(command_line)
multiqc(preprocessing.out.multiqc_adapter_removal.flatten().toList(),
preprocessing.out.multiqc_quality_filter.flatten().toList(),
preprocessing.out.multiqc_quality_control,
preprocessing.out.multiqc_quality_control_post_preprocessing,
barcode_handling.out.multiqc_exp_barcode_splitting.flatten().toList(),
alignment.out.report_alignments.flatten().toList(),
deduplication.out.report_deduplication.flatten().toList())
}
}
workflow.onComplete{
println "Pipeline completed at: $workflow.complete"
println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}
workflow.onError {
println "Something went wrong :("
println "Pipeline execution stopped with following error message: ${workflow.errorMessage}"
}