Skip to content

Commit

Permalink
New dataflow syntax
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Apr 30, 2024
1 parent b2f563d commit 24b34cf
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 206 deletions.
10 changes: 6 additions & 4 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
----------------------------------------------------------------------------------------
*/

nextflow.enable.dsl = 2
nextflow.preview.dsl = 3

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -33,7 +33,8 @@ include { PIPELINE_COMPLETION } from './subworkflows/local/utils_nfcore_fetc
workflow NFCORE_FETCHNGS {

take:
ids // channel: database ids read in from --input
ids // Channel<String>
params // ParamsMap

main:

Expand Down Expand Up @@ -69,7 +70,7 @@ workflow {
//
// SUBWORKFLOW: Run initialisation tasks
//
PIPELINE_INITIALISATION (
ids = PIPELINE_INITIALISATION (
params.version,
params.help,
params.validate_params,
Expand All @@ -84,7 +85,8 @@ workflow {
// WORKFLOW: Run primary workflows for the pipeline
//
NFCORE_FETCHNGS (
PIPELINE_INITIALISATION.out.ids
ids,
params,
)

//
Expand Down
46 changes: 24 additions & 22 deletions modules/local/sra_to_samplesheet/main.nf
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@

process SRA_TO_SAMPLESHEET {
tag "$meta.id"

executor 'local'
memory 100.MB

input:
Map meta
List<Map> sra_metadata
String pipeline
String strandedness
String mapping_fields

output:
Sample samplesheet = new Sample(meta, path("*samplesheet.csv"))
Sample mappings = new Sample(meta, path("*mappings.csv"))
Path samplesheet = path("samplesheet.csv")
Path mappings = path("mappings.csv")

exec:
//
// Create samplesheet containing metadata
//

def records = sra_metadata.collect { meta ->
getSraRecord(meta, pipeline, strandedness, mappings)
}

def samplesheet = records
.collect { pipeline_map, mappings_map -> pipeline_map }
.sort { record -> record.id }
mergeCsv(samplesheet, task.workDir.resolve('samplesheet.csv'))

def mappings = records
.collect { pipeline_map, mappings_map -> mappings_map }
.sort { record -> record.id }
mergeCsv(mappings, task.workDir.resolve('id_mappings.csv'))
}

def getSraRecord(Map meta, String pipeline, String strandedness, String mapping_fields) {
// Remove custom keys needed to download the data
def meta_clone = meta.clone()
meta_clone.remove("id")
Expand All @@ -30,7 +44,7 @@ process SRA_TO_SAMPLESHEET {
meta_clone.remove("single_end")

// Add relevant fields to the beginning of the map
pipeline_map = [
def pipeline_map = [
sample : "${meta.id.split('_')[0..-2].join('_')}",
fastq_1 : meta.fastq_1,
fastq_2 : meta.fastq_2
Expand All @@ -48,28 +62,16 @@ process SRA_TO_SAMPLESHEET {
}
pipeline_map << meta_clone

// Create a samplesheet
samplesheet = pipeline_map.keySet().collect{ '"' + it + '"'}.join(",") + '\n'
samplesheet += pipeline_map.values().collect{ '"' + it + '"'}.join(",")

// Write samplesheet to file
def samplesheet_file = task.workDir.resolve("${meta.id}.samplesheet.csv")
samplesheet_file.text = samplesheet

//
// Create sample id mappings file
//
mappings_map = pipeline_map.clone()
def fields = mapping_fields ? ['sample'] + mapping_fields.split(',').collect{ it.trim().toLowerCase() } : []
def mappings_map = pipeline_map.clone()
def fields = mapping_fields ? ['sample'] + mapping_fields.split(',').collect{ v -> v.trim().toLowerCase() } : []
if ((mappings_map.keySet() + fields).unique().size() != mappings_map.keySet().size()) {
error("Invalid option for '--sample_mapping_fields': ${mapping_fields}.\nValid options: ${mappings_map.keySet().join(', ')}")
}

// Create mappings
mappings = fields.collect{ '"' + it + '"'}.join(",") + '\n'
mappings += mappings_map.subMap(fields).values().collect{ '"' + it + '"'}.join(",")
mappings_map = mappings_map.subMap(fields)

// Write mappings to file
def mappings_file = task.workDir.resolve("${meta.id}.mappings.csv")
mappings_file.text = mappings
return [ pipeline_map, mappings_map ]
}
23 changes: 11 additions & 12 deletions subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,22 @@ workflow PIPELINE_INITIALISATION {
//
// Auto-detect input id type
//
ch_input = file(input)
if (isSraId(ch_input)) {
sraCheckENAMetadataFields(ena_metadata_fields)
} else {
input = file(input)
if (!isSraId(input))
error('Ids provided via --input not recognised please make sure they are either SRA / ENA / GEO / DDBJ ids!')
}
sraCheckENAMetadataFields(ena_metadata_fields)

// Read in ids from --input file
Channel
.from(ch_input)
.splitCsv(header:false, sep:'', strip:true)
.map { it[0] }
.unique()
.set { ch_ids }
input // Path
|> Channel.of // Channel<Path>
|> flatMap { csv ->
splitCsv(csv, header: false, schema: 'assets/schema_input.yml')
} // Channel<String>
|> unique // Channel<String>
|> set { ids } // Channel<String>

emit:
ids = ch_ids
ids // Channel<String>
}

/*
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 15 additions & 7 deletions subworkflows/nf-core/utils_nfcore_pipeline/main.nf

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 24b34cf

Please sign in to comment.