Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow output definition #4784

Merged
merged 54 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0de777a
Add initial prototype
bentsherman Feb 28, 2024
14beedc
Fix race condition
bentsherman Feb 28, 2024
062b421
Simplify output DSL
bentsherman Feb 28, 2024
06a5872
Support default publish options in path method
bentsherman Feb 29, 2024
eecf619
Rename OutputDsl -> WorkflowPublishDsl
bentsherman Mar 19, 2024
b238a81
Replace process selector with channel/topic selectors, add e2e test
bentsherman Mar 20, 2024
d0fa980
cleanup PublishOp
bentsherman Mar 20, 2024
8279743
Add topic operator (not working)
bentsherman Mar 22, 2024
7681e2d
Merge branch 'master' into 4670-workflow-outputs
pditommaso Mar 26, 2024
88d7ead
Add topic op test
pditommaso Mar 26, 2024
1e3d539
clean up e2e test
bentsherman Mar 27, 2024
925bc67
Fix issue with topic operator
bentsherman Mar 27, 2024
e4608b3
Apply suggestions from review
bentsherman Mar 27, 2024
1e69f3e
Add output directory option to CLI, config, output DSL
bentsherman Mar 27, 2024
01b0570
Validate publish options
bentsherman Mar 27, 2024
c43afad
Update docs
bentsherman Mar 27, 2024
570da27
Add defaults to directory statement
bentsherman Mar 27, 2024
0ad12eb
Update docs
bentsherman Mar 28, 2024
3e0823b
Apply suggestions from review
bentsherman Mar 30, 2024
5a1a7bb
Fix workflow binding
bentsherman Mar 30, 2024
3bdd0fe
Fix dynamic path name
bentsherman Apr 6, 2024
59d83c0
Apply suggestions from review
bentsherman Apr 10, 2024
5beca46
Merge branch 'master' into 4670-workflow-outputs
pditommaso Apr 12, 2024
b67376c
Add `publish:` section to process
bentsherman Apr 12, 2024
65d9111
Update docs
bentsherman Apr 12, 2024
35df924
Add publish options to output DSL
bentsherman Apr 12, 2024
509f271
Change publish op to handle multiple task dirs
bentsherman Apr 12, 2024
6fcbf6f
Fix error when no output block is specified
bentsherman Apr 15, 2024
550766a
Rename output -> publish, rule -> target
bentsherman Apr 15, 2024
72d8a6f
Disallow absolute path in publish target
bentsherman Apr 15, 2024
370aa0b
Add feature flag
bentsherman Apr 15, 2024
76af5c9
Allow multi-channel output to be published
bentsherman Apr 15, 2024
0833e11
Add overwrite modes for deep / lenient / standard hash comparison
bentsherman Apr 16, 2024
4614f26
Merge branch 'master' into 4670-workflow-outputs
pditommaso Apr 17, 2024
02745f7
Factor out HashBuilder from CacheHelper
bentsherman Apr 17, 2024
0583b3d
Add index file definition
bentsherman Apr 18, 2024
e60403d
Fix failing tests
bentsherman Apr 18, 2024
12c2720
Apply suggestions from review
bentsherman Apr 22, 2024
d998878
Don't write index file if no records were published
bentsherman Apr 24, 2024
95a110a
Redirect to `null` to disable publishing
bentsherman Apr 24, 2024
7953e79
Remove ternary hack, require parentheses instead
bentsherman Apr 24, 2024
856209b
Replace publish path option with ability to reroute targets in publis…
bentsherman Apr 29, 2024
50484d9
Apply suggestions from review
bentsherman May 1, 2024
ab9118e
Merge branch 'master' into 4670-workflow-outputs
pditommaso May 3, 2024
84e6382
Merge branch 'master' into 4670-workflow-outputs
pditommaso May 9, 2024
2b80f47
Merge branch 'master' into 4670-workflow-outputs
pditommaso May 12, 2024
d504e45
Minor change [ci skip]
pditommaso May 12, 2024
6185d84
Apply suggestions from review
bentsherman May 13, 2024
79cc68c
Fix typo [ci skip]
pditommaso May 15, 2024
a70af4e
Merge branch 'master' into 4670-workflow-outputs
pditommaso May 15, 2024
ef4305d
Add shorthand for publishing single file to index
bentsherman May 15, 2024
b8cf823
Fold PublishIndexOp into PublishOp, add test for OutputDsl,
bentsherman May 16, 2024
02ba0c7
Update index file default for single file
bentsherman May 16, 2024
0db73c8
Use file base name for default index
bentsherman May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.operator.ChainWithClosure
import groovyx.gpars.dataflow.operator.CopyChannelsClosure
import static nextflow.extension.DataflowHelper.newOperator
/**
* Implements the {@link OperatorImpl#topic} operator
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
@CompileStatic
class IntoTopicOp {

private DataflowReadChannel source

private String name

private List<DataflowWriteChannel> outputs

IntoTopicOp( DataflowReadChannel source, String name ) {
this.source = source
this.name = name
}

DataflowWriteChannel apply() {
final target = CH.createBy(source)
final topicSource = CH.createTopicSource(name)
this.outputs = [target, topicSource]
newOperator([source], outputs, new ChainWithClosure(new CopyChannelsClosure()))
return target
}

List<DataflowWriteChannel> getOutputs() {
return outputs
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import nextflow.splitter.FastaSplitter
import nextflow.splitter.FastqSplitter
import nextflow.splitter.JsonSplitter
import nextflow.splitter.TextSplitter
import org.codehaus.groovy.runtime.InvokerHelper
import org.codehaus.groovy.runtime.callsite.BooleanReturningMethodInvoker
import org.codehaus.groovy.runtime.typehandling.DefaultTypeTransformation
/**
Expand Down Expand Up @@ -1239,4 +1240,12 @@ class OperatorImpl {
.getOutput()
}

DataflowWriteChannel topic(DataflowReadChannel source, String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', OperatorImpl.class, InvokerHelper.EMPTY_ARGS)
final op = new IntoTopicOp(source, name)
final target = op.apply()
OpCall.current.get().outputs.addAll( op.outputs )
return target
}
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,17 @@ class PublishOp {

private Path sourceDir

private volatile boolean complete

private Session getSession() { Global.session as Session }

PublishOp(DataflowReadChannel source, Map opts) {
this.source = source
this.opts = opts ? new LinkedHashMap(opts) : Collections.emptyMap()

// adapt `to` option
if( this.opts.containsKey('to') ) {
this.opts.path = this.opts.to
this.opts.remove('to')
}

this.publisher = PublishDir.create(this.opts)
}

protected boolean getComplete() { complete }

PublishOp apply() {
final events = new HashMap(2)
events.onNext = this.&publish0
events.onComplete = this.&done0
DataflowHelper.subscribeImpl(source, events)
return this
}
Expand All @@ -76,11 +64,6 @@ class PublishOp {
publisher.apply(result, sourceDir)
}

protected void done0(nope) {
log.debug "Publish operator complete"
this.complete = true
}

protected void collectFiles(entry, Collection<Path> result) {
if( entry instanceof Path ) {
result.add(entry)
Expand All @@ -103,16 +86,18 @@ class PublishOp {
* @return
*/
protected Path getTaskDir(Path path) {
if( path==null )
if( path == null )
return null
def result = getTaskDir0(path, session.workDir)
def result = getTaskDir0(path, session.workDir.resolve('tmp'))
if( result == null )
result = getTaskDir0(path, session.workDir)
if( result == null )
result = getTaskDir0(path, session.bucketDir)
return result
}

private Path getTaskDir0(Path file, Path base) {
if( base==null )
if( base == null )
return null
if( base.fileSystem != file.fileSystem )
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ abstract class BaseScript extends Script implements ExecutionContext {
meta.addDefinition(workflow)
}

protected output(Closure closure) {
if( !entryFlow )
throw new IllegalStateException("Publish definition must be defined after the anonymous workflow")
if( ExecutionStack.withinWorkflow() )
throw new IllegalStateException("Publish definition is not allowed within a workflow")

entryFlow.publisher = closure
}

protected IncludeDef include( IncludeDef include ) {
if(ExecutionStack.withinWorkflow())
throw new IllegalStateException("Include statement is not allowed within a workflow definition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package nextflow.script

import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.exception.MissingProcessException
import nextflow.exception.MissingValueException
import nextflow.exception.ScriptRuntimeException
import nextflow.extension.CH
import nextflow.extension.PublishOp
/**
* Models a script workflow component
*
Expand Down Expand Up @@ -52,6 +56,8 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec

private WorkflowBinding binding

private Closure publisher

WorkflowDef(BaseScript owner, Closure<BodyDef> rawBody, String name=null) {
this.owner = owner
this.name = name
Expand All @@ -70,6 +76,10 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec
/* ONLY FOR TESTING PURPOSE */
protected WorkflowDef() {}

void setPublisher(Closure publisher) {
this.publisher = publisher
}

WorkflowDef clone() {
final copy = (WorkflowDef)super.clone()
copy.@body = body.clone()
Expand Down Expand Up @@ -204,6 +214,14 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec
closure.call()
// collect the workflow outputs
output = collectOutputs(declaredOutputs)
// publish the workflow outputs
if( publisher ) {
final dsl = new WorkflowPublishDsl(binding)
final cl = (Closure)publisher.clone()
cl.setResolveStrategy(Closure.DELEGATE_FIRST)
cl.setDelegate(dsl)
cl.call()
}
return output
}

Expand Down Expand Up @@ -254,3 +272,69 @@ class WorkflowParamsResolver {
return opts
}
}

/**
* Implements the DSL for publishing workflow outputs
*
* @author Ben Sherman <[email protected]>
*/
@CompileStatic
class WorkflowPublishDsl {

private Binding binding

WorkflowPublishDsl(Binding binding) {
this.binding = binding
}

@Override
Object getProperty(String name) {
try {
return binding.getProperty(name)
}
catch( MissingPropertyException e ){
return super.getProperty(name)
}
}

void path(Map opts=[:], String path, Closure closure) {
final dsl = new PathDsl(Path.of(path), opts)
final cl = (Closure)closure.clone()
cl.setResolveStrategy(Closure.DELEGATE_FIRST)
cl.setDelegate(dsl)
cl.call()
}

class PathDsl {

private Path path
private Map defaults

PathDsl(Path path, Map defaults) {
this.path = path
this.defaults = defaults
}

void path(Map opts=[:], String subpath, Closure closure) {
final dsl = new PathDsl(path.resolve(subpath), defaults + opts)
final cl = (Closure)closure.clone()
cl.setResolveStrategy(Closure.DELEGATE_FIRST)
cl.setDelegate(dsl)
cl.call()
}

void select(Map opts=[:], DataflowWriteChannel source) {
new PublishOp(CH.getReadChannel(source), defaults + opts + [path: path]).apply()
}

void select(Map opts=[:], ChannelOut out) {
if( out.size() != 1 )
throw new IllegalArgumentException("Cannot publish a multi-channel output")
select(opts, out[0])
}

void topic(Map opts=[:], String name) {
select(opts, Channel.topic(name))
}
}
}
82 changes: 82 additions & 0 deletions tests/publish-dsl.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env nextflow
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


process align {
input:
val(x)

output:
path("*.bam")
path("${x}.bai")

"""
echo ${x} > ${x}.bam
echo ${x} | rev > ${x}.bai
"""
}

process my_combine {
input:
path(bamfile)
path(baifile)

output:
path 'result.txt'

"""
cat $bamfile > result.txt
cat $baifile >> result.txt
"""
}

process foo {
output:
file 'xxx'

'''
mkdir xxx
touch xxx/A
touch xxx/B
touch xxx/C
'''
}

workflow {
def input = Channel.of('alpha','beta','delta')
align(input)

def bam = align.out[0].toSortedList { it.name }
def bai = align.out[1].toSortedList { it.name }
my_combine( bam, bai )
my_combine.out.view{ it.text }

foo()
}

output {
path('data') {
select align.out[0], mode: 'copy'
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
select align.out[1], mode: 'copy'
select my_combine.out
select foo.out, mode: 'link'
}

path('more/data') {
select my_combine.out, mode: 'copy'
}
}
Loading