Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move code from main workflow to subworkflows #446

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#426](https://github.com/nf-core/mag/pull/426) - Fixed typo in help text for parameters `--host_genome` and `--host_fasta` (by @tillenglert)
- [#434](https://github.com/nf-core/mag/pull/434) - Fix location of samplesheet for AWS full tests (reported by @Lfulcrum, fix by @jfy133)
- [#438](https://github.com/nf-core/mag/pull/438) - Fixed version inconsistency between conda and containers for GTDBTK_CLASSIFYWF (by @jfy133)
- [#446](https://github.com/nf-core/mag/pull/446) - Reorganise main workflow code into subworkflows for increased modularity (by @prototaxites)
- [#439](https://github.com/nf-core/mag/pull/445) - Fix bug in assembly input (by @prototaxites)
- [#447](https://github.com/nf-core/mag/pull/447) - Remove `default: None` from parameter schema (by @drpatelh)
- [#449](https://github.com/nf-core/mag/pull/447) - Fix results file overwriting in Ancient DNA workflow (reported by @alexhbnr, fix by @jfy133)
Expand Down
45 changes: 45 additions & 0 deletions subworkflows/local/annotation.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
include { PROKKA } from '../../modules/nf-core/prokka/main'
include { PRODIGAL } from '../../modules/nf-core/prodigal/main'

workflow ANNOTATION {
take:
bins_unbins
assemblies

main:
ch_versions = Channel.empty()


/*
Prodigal: Predict proteins
*/
if (!params.skip_prodigal){
PRODIGAL (
assemblies,
'gff'
)
ch_versions = ch_versions.mix(PRODIGAL.out.versions.first())
}

/*
* Prokka: Genome annotation
*/
ch_bins_for_prokka = bins_unbins.transpose()
.map { meta, bin ->
def meta_new = meta.clone()
meta_new.id = bin.getBaseName()
[ meta_new, bin ]
}

if (!params.skip_prokka){
PROKKA (
ch_bins_for_prokka,
[],
[]
)
ch_versions = ch_versions.mix(PROKKA.out.versions.first())
}

emit:
versions = ch_versions
}
133 changes: 133 additions & 0 deletions subworkflows/local/assembly.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
include { POOL_SINGLE_READS as POOL_SHORT_SINGLE_READS } from '../../modules/local/pool_single_reads'
include { POOL_PAIRED_READS } from '../../modules/local/pool_paired_reads'
include { POOL_SINGLE_READS as POOL_LONG_READS } from '../../modules/local/pool_single_reads'
include { MEGAHIT } from '../../modules/local/megahit'
include { SPADES } from '../../modules/local/spades'
include { SPADESHYBRID } from '../../modules/local/spadeshybrid'

workflow ASSEMBLY {
take:
short_reads_assembly
long_reads

main:
ch_versions = Channel.empty()

// Co-assembly: prepare grouping for MEGAHIT and for pooling for SPAdes
if (params.coassemble_group) {
// short reads
// group and set group as new id
ch_short_reads_grouped = short_reads_assembly
.map { meta, reads -> [ meta.group, meta, reads ] }
.groupTuple(by: 0)
.map { group, metas, reads ->
def assemble_as_single = params.single_end || ( params.bbnorm && params.coassemble_group )
def meta = [:]
meta.id = "group-$group"
meta.group = group
meta.single_end = assemble_as_single
if ( assemble_as_single ) [ meta, reads.collect { it }, [] ]
else [ meta, reads.collect { it[0] }, reads.collect { it[1] } ]
}
// long reads
// group and set group as new id
ch_long_reads_grouped = long_reads
.map { meta, reads -> [ meta.group, meta, reads ] }
.groupTuple(by: 0)
.map { group, metas, reads ->
def meta = [:]
meta.id = "group-$group"
meta.group = group
[ meta, reads.collect { it } ]
}
} else {
ch_short_reads_grouped = short_reads_assembly
.filter { it[0].single_end }
.map { meta, reads -> [ meta, [ reads ], [] ] }
.mix (
short_reads_assembly
.filter { ! it[0].single_end }
.map { meta, reads -> [ meta, [ reads[0] ], [ reads[1] ] ] }
)
ch_long_reads_grouped = long_reads
}

ch_assemblies = Channel.empty()
if (!params.skip_megahit){
MEGAHIT ( ch_short_reads_grouped )
ch_megahit_assemblies = MEGAHIT.out.assembly
.map { meta, assembly ->
def meta_new = meta.clone()
meta_new.assembler = "MEGAHIT"
[ meta_new, assembly ]
}
ch_assemblies = ch_assemblies.mix(ch_megahit_assemblies)
ch_versions = ch_versions.mix(MEGAHIT.out.versions.first())
}

// Co-assembly: pool reads for SPAdes
if ( ! params.skip_spades || ! params.skip_spadeshybrid ){
if ( params.coassemble_group ) {
if ( params.bbnorm ) {
ch_short_reads_spades = ch_short_reads_grouped.map { [ it[0], it[1] ] }
} else {
POOL_SHORT_SINGLE_READS (
ch_short_reads_grouped
.filter { it[0].single_end }
)
POOL_PAIRED_READS (
ch_short_reads_grouped
.filter { ! it[0].single_end }
)
ch_short_reads_spades = POOL_SHORT_SINGLE_READS.out.reads
.mix(POOL_PAIRED_READS.out.reads)
}
} else {
ch_short_reads_spades = short_reads_assembly
}
// long reads
if (!params.single_end && !params.skip_spadeshybrid){
POOL_LONG_READS ( ch_long_reads_grouped )
ch_long_reads_spades = POOL_LONG_READS.out.reads
} else {
ch_long_reads_spades = Channel.empty()
}
} else {
ch_short_reads_spades = Channel.empty()
ch_long_reads_spades = Channel.empty()
}

if (!params.single_end && !params.skip_spades){
SPADES ( ch_short_reads_spades )
ch_spades_assemblies = SPADES.out.assembly
.map { meta, assembly ->
def meta_new = meta.clone()
meta_new.assembler = "SPAdes"
[ meta_new, assembly ]
}
ch_assemblies = ch_assemblies.mix(ch_spades_assemblies)
ch_versions = ch_versions.mix(SPADES.out.versions.first())
}

if (!params.single_end && !params.skip_spadeshybrid){
ch_short_reads_spades_tmp = ch_short_reads_spades
.map { meta, reads -> [ meta.id, meta, reads ] }
ch_reads_spadeshybrid = ch_long_reads_spades
.map { meta, reads -> [ meta.id, meta, reads ] }
.combine(ch_short_reads_spades_tmp, by: 0)
.map { id, meta_long, long_reads, meta_short, short_reads -> [ meta_short, long_reads, short_reads ] }
SPADESHYBRID ( ch_reads_spadeshybrid )
ch_spadeshybrid_assemblies = SPADESHYBRID.out.assembly
.map { meta, assembly ->
def meta_new = meta.clone()
meta_new.assembler = "SPAdesHybrid"
[ meta_new, assembly ]
}
ch_assemblies = ch_assemblies.mix(ch_spadeshybrid_assemblies)
ch_versions = ch_versions.mix(SPADESHYBRID.out.versions.first())
}

emit:
assemblies = ch_assemblies
versions = ch_versions
}
20 changes: 20 additions & 0 deletions subworkflows/local/assembly_qc.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
include { QUAST } from '../../modules/local/quast'

workflow ASSEMBLY_QC {
take:
assemblies

main:
ch_versions = Channel.empty()
ch_quast_multiqc = Channel.empty()

if (!params.skip_quast){
QUAST ( assemblies )
ch_quast_multiqc = QUAST.out.qc
ch_versions = ch_versions.mix(QUAST.out.versions.first())
}

emit:
quast_multiqc = ch_quast_multiqc
versions = ch_versions
}
109 changes: 109 additions & 0 deletions subworkflows/local/bin_qc.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
include { ARIA2 as ARIA2_UNTAR } from '../../modules/nf-core/aria2/main'
include { BUSCO_QC } from '../../subworkflows/local/busco_qc'
include { CHECKM_QC } from '../../subworkflows/local/checkm_qc'
include { GUNC_QC } from '../../subworkflows/local/gunc_qc'
include { QUAST_BINS } from '../../modules/local/quast_bins'
include { QUAST_BINS_SUMMARY } from '../../modules/local/quast_bins_summary'

workflow BIN_QC {
take:
bins_unbins

main:
if(params.busco_reference){
ch_busco_db_file = Channel
.value(file( "${params.busco_reference}" ))
} else {
ch_busco_db_file = Channel.empty()
}
if (params.busco_download_path) {
ch_busco_download_folder = Channel
.value(file( "${params.busco_download_path}" ))
} else {
ch_busco_download_folder = Channel.empty()
}

if(params.checkm_db) {
ch_checkm_db = file(params.checkm_db, checkIfExists: true)
}

// Get checkM database if not supplied
if ( !params.skip_binqc && params.binqc_tool == 'checkm' && !params.checkm_db ) {
ARIA2_UNTAR (params.checkm_download_url)
ch_checkm_db = ARIA2_UNTAR.out.downloaded_file
}

if (params.gunc_db) {
ch_gunc_db = file(params.gunc_db, checkIfExists: true)
} else {
ch_gunc_db = Channel.empty()
}

ch_versions = Channel.empty()
ch_busco_multiqc = Channel.empty()
ch_busco_summary = Channel.empty()
ch_checkm_summary = Channel.empty()
ch_quast_bins_summary = Channel.empty()
ch_busco_failed_bins = Channel.empty()

bins_unbins_transposed = bins_unbins.transpose()

if (!params.skip_binqc && params.binqc_tool == 'busco'){
/*
* BUSCO subworkflow: Quantitative measures for the assessment of genome assembly
*/
BUSCO_QC (
ch_busco_db_file,
ch_busco_download_folder,
bins_unbins_transposed
)
ch_busco_summary = BUSCO_QC.out.summary
ch_busco_multiqc = BUSCO_QC.out.multiqc
ch_versions = ch_versions.mix(BUSCO_QC.out.versions.first())
}

if (!params.skip_binqc && params.binqc_tool == 'checkm'){
/*
* CheckM subworkflow: Quantitative measures for the assessment of genome assembly
*/
CHECKM_QC (
bins_unbins_transposed.groupTuple(),
ch_checkm_db
)
ch_checkm_summary = CHECKM_QC.out.summary

// TODO custom output parsing? Add to MultiQC?
ch_versions = ch_versions.mix(CHECKM_QC.out.versions)

}

if ( params.run_gunc && params.binqc_tool == 'checkm' ) {
GUNC_QC ( bins_unbins_transposed, ch_gunc_db, CHECKM_QC.out.checkm_tsv )
ch_versions = ch_versions.mix( GUNC_QC.out.versions )
} else if ( params.run_gunc ) {
GUNC_QC ( bins_unbins_transposed, ch_gunc_db, [] )
ch_versions = ch_versions.mix( GUNC_QC.out.versions )
}

if (!params.skip_quast){
ch_input_for_quast_bins = bins_unbins
.groupTuple()
.map{
meta, reads ->
def new_reads = reads.flatten()
[meta, new_reads]
}
QUAST_BINS ( ch_input_for_quast_bins )
ch_versions = ch_versions.mix(QUAST_BINS.out.versions.first())
QUAST_BINS_SUMMARY ( QUAST_BINS.out.quast_bin_summaries.collect() )
ch_quast_bins_summary = QUAST_BINS_SUMMARY.out.summary
}

emit:
busco_summary = ch_busco_summary
busco_multiqc = ch_busco_multiqc
busco_failed_bins = ch_busco_failed_bins
checkm_summary = ch_checkm_summary
quast_bins_summary = ch_quast_bins_summary
versions = ch_versions
}
75 changes: 75 additions & 0 deletions subworkflows/local/bin_taxonomy.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
include { CAT_DB } from '../../modules/local/cat_db'
include { CAT_DB_GENERATE } from '../../modules/local/cat_db_generate'
include { CAT } from '../../modules/local/cat'
include { CAT_SUMMARY } from "../../modules/local/cat_summary"
include { GTDBTK } from '../../subworkflows/local/gtdbtk'

workflow BIN_TAXONOMY {
take:
bins_unbins
busco_summary
checkm_summary

main:
ch_versions = Channel.empty()

if(params.cat_db){
ch_cat_db_file = Channel
.value(file( "${params.cat_db}" ))
} else {
ch_cat_db_file = Channel.empty()
}

gtdb = params.skip_binqc ? false : params.gtdb
if (gtdb) {
ch_gtdb = Channel
.value(file( "${gtdb}" ))
} else {
ch_gtdb = Channel.empty()
}

ch_cat_db = Channel.empty()
if (params.cat_db){
CAT_DB ( ch_cat_db_file )
ch_cat_db = CAT_DB.out.db
} else if (params.cat_db_generate){
CAT_DB_GENERATE ()
ch_cat_db = CAT_DB_GENERATE.out.db
}

bins_unbins_transposed = bins_unbins.transpose()

/*
* CAT: Bin Annotation Tool (BAT) are pipelines for the taxonomic classification of long DNA sequences and metagenome assembled genomes (MAGs/bins)
*/

if (params.cat_db || params.cat_db_generate) {
CAT (
bins_unbins_transposed,
ch_cat_db
)
CAT_SUMMARY(
CAT.out.tax_classification.collect()
)
ch_versions = ch_versions.mix(CAT.out.versions.first())
ch_versions = ch_versions.mix(CAT_SUMMARY.out.versions)
}
/*
* GTDB-tk: taxonomic classifications using GTDB reference
*/
ch_gtdbtk_summary = Channel.empty()
if ( gtdb ){
GTDBTK (
bins_unbins_transposed,
ch_busco_summary,
ch_checkm_summary,
ch_gtdb
)
ch_versions = ch_versions.mix(GTDBTK.out.versions.first())
ch_gtdbtk_summary = GTDBTK.out.summary
}

emit:
gtdbtk_summary = ch_gtdbtk_summary
versions = ch_versions
}
Loading