Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize workflow output definition #5103

Open
bentsherman opened this issue Jul 2, 2024 · 31 comments · May be fixed by #5185
Open

Finalize workflow output definition #5103

bentsherman opened this issue Jul 2, 2024 · 31 comments · May be fixed by #5185
Milestone

Comments

@bentsherman
Copy link
Member

bentsherman commented Jul 2, 2024

Continuation of #4670

Enumerating the proposed changes that we've collected so far:

  • Support additional index file formats (json, yaml)

  • Generate output schema. Essentially a list of index file schemas. Should be generated on each run or via some separate command. Should eventually be used with parameter schema for chaining pipelines. See Proposal: DSL2+ nf-core/fetchngs#312 for a concrete example (schema_outputs.yml). -> Generate output schema from output definition #5213

  • Dynamic path mapping. Allow the path option in target definition to be a closure:

    workflow {
      main:
      ch_foo = Channel.of( [ [id: 1], file('foo.fastq') ] )
    
      publish:
      ch_foo >> 'foo'
    }
    
    output {
      'foo' {
        path { meta, fastq -> "foo/${meta.id}" }
      }
    }

    Note that the dynamic path need only define the directory, not the full filename. Since a channel value may contain multiple files, an alternative syntax could be to provide the file and the channel value to the closure, so that it's clear which file is being published:

        path { file, value -> "foo/${value[0].id}" }
  • Move publish options to config. Publish options like directory and mode typically need to be configurable by the user, which currently would require you to define a special param for each option. Therefore it makes more sense for them to be config settings rather than pipeline code:

    // nextflow.config
    workflow {
      output {
        directory = 'results'
        mode = 'copy'
    
        withTarget:'foo' {
          mode = 'link'
        }
      }
    }
    
    // main.nf
    output {
      'foo' {
        index {
          path 'index.csv'
          // ...
        }
      }
    }

    The output block should be used only to define index files (i.e. the output schema). In other words, the pipeline code should define what is published and the config should define how it is published.

    For the output directory, it has also been proposed to provide a CLI option for it e.g. -output-dir and shorter config option e.g. outputDir. The output directory would be available in the pipeline code as part of workflow metadata i.e. workflow.outputDir.

  • Remove publish section from process definition. Still under discussion. The rationale for the process publish section was to provide some sensible defaults which can be overridden, however I've come to think that it only makes it harder to determine what is being published. Instead of enumerating the publish targets in one place, they are scattered throughout the pipeline code. Also, process definitions are abstracted from any particular pipeline, so it doesn't make much sense to put pipeline-specific details like params and publishing in the process definition.

    A better way to give some sensible defaults for a process would be to write an entry workflow in the process definition that gives an example usage:

    process foo {
      // ...
    }
    
    workflow {
      main:
      params.input = '...'
      foo( params.input )
    
      publish:
      foo.out >> 'foo'
    }

    This workflow will be ignored when importing the process as a module, but it provides a concrete example and can even be used to run the process directly. In fact it could even replace the custom nf-test DSL eventually.

  • Allow publish section only in entry workflow. I am less certain about this one but wanted to lay out the case for it. Building on the previous item, having publish sections potentially spread across several workflows in a pipeline makes it hard to see what all is being published. Instead, maybe named workflows should only be allowed to emit outputs, and only the entry workflow be able to publish outputs. As with the previous point, you could write an entry workflow for each named workflow which gives some example publishing (and allow you to run the workflow as a standalone pipeline).

    This idea is inspired by a principle in software engineering that side effects (a.k.a. I/O, publishing) should be pushed to the "boundaries" of the code, to make it more readable and testable, and to make it easier to swap out different I/O strategies (file copy, database insert, API call, etc).

    At the same time, I appreciate that publishing from a named workflow is a convenient shorthand, especially when you considering having to propagate outputs back up through potentially several nested workflows. But I wonder if being more strict here would be better in the long run. The example entry workflow is something that will be written anyway, both for testing and to run workflows as standalone pipelines.

  • Runtime enhancements

    • report target names in the output block that weren't defined in a process or workflow
    • prevent leading or trailing slashes
    • detect published file collisions at runtime
    • detect outputs that aren't published or used by downstream processes
  • Include output targets in inspect command. Similar to how inspect lists all processes with some resolved directives, etc, it could also show the resolved list of output targets. Not as essential if we implement some of the above points, but still useful for things like resolving params.

@robsyme
Copy link
Collaborator

robsyme commented Jul 2, 2024

Great suggestions Ben. Do you expect that there will be any changes to the observable events in this new channel-focused model?

A common use-case for plugins is to register the outputs with a third-party service (LIMS, for example). It would be helpful if plugins could hook into a publish event that has both the source, destination, and other channel objects (i.e. meta) in a single event. So that the outputs could be appropriately tagged/saved.

If we are now "publishing channels", it would be helpful to have a way to include everything, including non-file data in that publishing event.

@pinin4fjords
Copy link
Contributor

Great to see this coming together after previous discussions Ben!

Really like the output-dir. It always felt odd to me that work-dir was something native while the workflow builders had to deal with the output directory themselves.

The distinction between 'output' and 'publishing' never sat well with me, it made much more sense to me if everything was an 'output', with things marked temporary/ retained as appropriate. Plus it is a nightmare searching across multiple config files for the publishing logic for a specific set of files (see the current dev branch of nf-core/rnaseq). So I really like all the places where you're removing publishing logic and it makes a LOT of sense to me that the publishing/ retaining part happens only at the outermost layer- I hope you keep that.

Really like having entry workflows as well, having all components be more easily runnable without as much surrounding boilerplate would be awesome.

Would also be interested to hear more on the non-file output aspects, speaking to Rob's point, and how it could assist with daisy chaining workflows, but overall love the way this is going!

@pditommaso
Copy link
Member

Points I'd like to discuss:

  • Support additional index file formats: for which file(s)?
  • Dynamic path mapping: syntax clashes with block definition
  • Move publish options to config:
    1. why is requires withTarget?
    2. still should be possible to have in the script
  • Remove publish section from process definition.
  • Allow publish section only in entry workflow.

@bentsherman
Copy link
Member Author

From @Midnighter in #5130 :

In the last podcast episode, you were debating a bit, whether allowing to define output publishing from a module/process-level.

My clear opinion is that this is a bad idea:

  • It's a mix of concerns. It should not be a process' (function's) concern where its output is stored. You are giving it too much responsibility.
  • Additionally, allowing publishing to only be set at the workflow level ensures the modularity of processes is optimal. The same actually applies to sub-workflows in my mind. I would only consider the highest-level workflow publishing instructions and ignore publishing set in sub-workflows. That way, modules/sub-workflows can easily be used in different pipelines without requiring overrides.
  • In my view, processes should be as close as possible to pure functions, such that you have a reproducible output when given a deterministic environment (container hash), same input (hash of data), and same operations (hash of code/git commit). Changing publishing behavior requires code changes, even though the operation itself is not changing.
  • Flexibility in storage backends: If a module assumes certain output path capabilities that are not supported by my storage solution, I have to redefine all those publishing options.

@bentsherman
Copy link
Member Author

Thanks @Midnighter, I have basically reached the same conclusions. After discussing with Paolo, I think we will encourage publish: to be used only in the entry workflow, but still allow it in sub-workflows as a convenience, and not allow it in processes.

@FriederikeHanssen
Copy link

Hey! If I can add something from my wishlist:

it would be great to get logging on what dynamic output paths like "foo/${meta.id}" are resolved to. Either just for WARN/Errors (i.e.publishDir path is null ) and have it show up somewhere in the error logs. Or if it doesn't get to verbose in the regular nextflow log.

@nvnieuwk
Copy link

Any news on when the dynamic path mapping will be in a nextflow edge release? I'm currently trying to convert my file publishing to the new output definitions but need this feature to be able to copy the old way

@bentsherman
Copy link
Member Author

@nvnieuwk Still in progress. I have an implementation in the linked PR but it needs to be reviewed and tested. You're welcome to try it out if you want. I doubt it will make the next edge release, maybe 24.09.0-edge.

@bentsherman
Copy link
Member Author

I'd like to get some opinions on some of the nuances of the dynamic path mapping. If I have something like this:

workflow {
  main:
  ch_fastq = Channel.of( [ [id: 1], [ file('foo.fastq') ] ] )

  publish:
  ch_fastq >> 'fastq'
}

output {
  directory 'results'

  'fastq' {
    path { /* ... */ }
  }
}

I see a few different ways to design the path closure:

  1. The path closure defines the entire file path relative to the output directory:

    // val == [ meta, fastqs ]
    path { file, val -> "fastq/${val[0].id}/${file.name}" }

    The final published path would be results/fastq/${meta.id}/${file.name}. Since a channel value can contain multiple files, the path closure is invoked for each individual file, passing both the file and the containing channel value.

  2. The path closure only defines the subdirectory:

    path { val -> "fastq/${val[0].id}" }
    // could also do { meta, fastqs -> "fastq/${meta.id}" }

    The final published path is the same as (1), but now the file name is implied and can't be customized in the path closure. All files in a channel value are published to the same subdirectory.

I like (2) because it's simpler and retains the meaning of the path setting, but it's also not as flexible as (1). If by "dynamic paths" we only mean things like "organize samples into subdirectories by id", then (2) is all we need. But if people also want to be able to change the base name of each file (which is supported by saveAs), then (2) is not flexible enough.

Option (1) is essentially the same as saveAs but with the context explicitly given as a second parameter (the channel value), rather than the task inputs being implicitly available.

Let me know what you think. This is the essential question we need to resolve in order to move this feature forward.

@nvnieuwk
Copy link

I personally like option 1 because that way I don't have to make sure the files emitted by processes have the correct name as I will set the correct output name here. It's also a nice option to be able to specify some kind of optional nesting which option 1 would enable.

@nvnieuwk
Copy link

During my testing I also noticed a weird error. So when an input file that hasn't been staged is present in the channel specified in the publish section, the next error happens:

  ERROR ~ assert path
         |
         null

It started working again if I filtered out these paths

@bentsherman
Copy link
Member Author

Two more syntax ideas from @robsyme and @pinin4fjords respectively

  1. Closure within a closure. The outer closure is over the channel value and returns a closure that corresponds exactly to the saveAs closure:

    path { meta, fastqs ->
      { file -> "${meta.id}/${file.baseName}" }
    }

    A bit more verbose but basically just a copy-and-paste from the old way, might be easier for people to move to.

  2. Closure that only takes two arguments, a metadata map and a single file:

    path { meta, fastq -> "${meta.id}/${fastq.baseName}" }

    Keeps the closure simple. Requires the user to "prepare" output channels in the workflow logic for publishing by making sure they adhere to this structure, e.g.:

    workflow {
      ch_fastq = Channel.of( [ [:], [ file('1.fastq'), file('2.fastq') ] ] )
    
      publish:
      ch_fastq.transpose() >> 'fastq'
    }

    So probably just a lot of extra transposes. Fits well with the index file model of file + metadata. The big question is whether this pattern is truly generic enough to cover all use cases or if we are optimizing too much around nf-core.

@pinin4fjords
Copy link
Contributor

Thanks @bentsherman!

I like 4) because, while it forces me to to a transpose from time to time, I know that if I make my channels fit a predefined schema, I'm good. I guess we'd probably just end up doing a lot of:

  publish:
  ch_fastq.tranpose() >> 'fastq'

?

(Although, if it's a case of doing that all the time, maybe there could be something implicit somewhere to do that?)

In any case, predictable structure of publishable channels means that all my output blocks are going to look very similar which makes them easier to maintain relative to a custom closure every time. Maybe the nf-core meta pragma isn't what we need, I'd be happy with any application of metadata to files.

To take this to the nth degree, if things were predictable enough, maybe there could be some default behaviour so we didn't even need

  'fastq' {
    path { /* ... */ }
  }

... most of the time. There could be a default way in which files were structured in the publish directories based on metadata properties. You'd only need the closure if you wanted to depart from those defaults.

@robsyme
Copy link
Collaborator

robsyme commented Sep 17, 2024

What if, for people that wanted an index file, the pattern 3 could be extended to include something close to what @pinin4fjords is suggesting. The user could return two elements, where the first element is the metadata and the second element is the destination filename:

path { meta, fastqs ->
  { file -> 
    destination = "${meta.id}/${file.baseName}" 
    [meta.subMap('id', 'patient'), destination]
  }
}

The index could simply use the first element (if provided) to populate the columns in the csv/tsv/whatever.

This would remove the necessity for the channel transforms in the workflow. If the transforms are always going to be simple unwrapping/transposing, I think this approach would be tidier. If we expect transforms to be more involved, then this proposal would not be suitable.

@bentsherman
Copy link
Member Author

bentsherman commented Sep 17, 2024

The samplesheet produced by fetchngs contains a row for each sample, where each sample has potentially four files (fastq pair + md5 pair) associated with it.

This example suggests that the publishing is file-centric whereas the index file is sample-centric, so we can't necessarily couple these things as a shortcut. For the path option I need to be able to define the publish mapping for each individual file, but for the index file I don't necessarily want each file on its own line.

In order words, the published channel needs to correspond exactly to the rows of the index file, basically option (3).

I think the appeal of option (4) is to not have so much code in the output block, and to make the path closure uniform, instead of it depending on the published channel which is in a completely different part of the code. And to make space for more aggressive shortcuts.

Need to see if there is a way to recover those properties with option (3). But I suspect the "shortcut" is to not use dynamic paths at all.

workflow {
  main:
  ch_fastq = Channel.of( [ [:], file('1.fastq'), file('2.fastq') ] )

  publish:
  ch_fastq >> 'fastq'
}

output {
  fastq {
    // default: publish everything to 'fastq/'
    // no path option needed

    // dynamic path (if you want it)
    path { meta, fastq_1, fastq_2 ->
      { file -> "fastq/${meta.id}/${file.baseName}" }
    }

    // should just work
    index {
      path 'samplesheet.csv'
    }
  }
}

I think the main shortcut I was imagining is that the index file should just work with the conventional meta + file(s) structure as shown above, without any extra closure logic. Bit tricky to infer the column names for the files but still doable. Would become much simpler with record types.

But the path closure seems unavoidable if you want a dynamic path, because there is no guaranteed 1-to-1 mapping of channel value to file.

@bentsherman
Copy link
Member Author

Another nice thing I realized about the double closure is that it could also support option (2) above, by returning a path instead of an inner closure. That would correspond to publishing each file from the channel into a directory without modifying the base file name:

    path { meta, fastq_1, fastq_2 ->
      "fastq/${meta.id}"
    }

So there can be multiple levels of dynamism depending on how specific you want to be.

@pditommaso
Copy link
Member

The nested closure is very confusing. It's hard to understand when using the more than one argument or a single one. I think the problem could be reduced to having a closure with arguments, one for the context and a second for the file path. The context could be by definition the first one.

@bentsherman
Copy link
Member Author

Keep in mind that the double closure is the most extreme form of dynamic path and the least likely to be used. Doing something like { file, context -> ... } would make it simpler but then make the other more common usages more complicated / less readable. So I think we should optimize for the simpler more common usages and let the least common one be ugly:

output {
  // many levels of dynamism
  fastq {
    // default: publish to 'fastq/'
    // nothing

    // publish to a different static path
    path 'samples'

    // publish to subdirectories by sample id
    path { meta, fastq_1, fastq_2 ->
      "fastq/${meta.id}"
    }

    // publish each file to its own path (extreme)
    path { meta, fastq_1, fastq_2 ->
      { file -> "fastq/${meta.id}/${file.baseName}" }
    }
  }
}

@bentsherman
Copy link
Member Author

@robsyme I've been thinking about your point about updating the publish events. Now that I've taken another pass through everything, I see now that it would actually be quite simple to do what you suggested.

We could add a new event e.g. onWorkflowPublish() which is triggered whenever a channel value is published. Just as the index file records each channel value as a row in the CSV, the trace observer should be able to listen for these "published values".

We can keep the existing onFilePublish() event as it is, because the two events are complementary:

  • onWorkflowPublish() gives you each published value
  • onFilePublish() gives you each published file and where it was published

So you can listen to either event or both based on what you need.

I was worried at first because I was thinking about trying to attach metadata to each individual file, but I like this separate event much better

@robsyme
Copy link
Collaborator

robsyme commented Sep 30, 2024

You superstar. Thanks Ben!!!
That's exactly the result I was hoping for.

@robsyme
Copy link
Collaborator

robsyme commented Sep 30, 2024

Thanks @bentsherman. Three quick questions

  1. would the onWorkflowPublish() event only have access to the non-file values in the channel?
  2. would the values in the same "entity" or "row" in the channel be available together?
  3. if the onWorkflowPublish() does not have access to the file/path objects, would it be possible to associate those files with the metadata (and vice versa) ?

@bentsherman
Copy link
Member Author

bentsherman commented Sep 30, 2024

Yes it should just receive a single argument corresponding to the channel value:

void onWorkflowPublish(Object value) {
  def (meta, fastqs) = [ value[0], value[1] ]
  // ...
}

@bentsherman
Copy link
Member Author

I guess the tricky part here is that this event hook is receiving every value from every published channel, so you'll have to do some pattern matching if you want to dig deeper into things like metadata

That's actually pretty wild from the plugin's perspective... you're receiving arbitrary values that depend entirely on the pipeline you're running. Of course you could optimize around a few common use cases like "if it's a list, and the first element is a map, etc ..."

We might want to think more critically about this interface

@robsyme
Copy link
Collaborator

robsyme commented Oct 1, 2024

You make an excellent point. The lack of specificity would be challenging.

Given the interface

workflow {
  main:
  ch_foo = Channel.of( [ [id: 1], file('foo.fastq') ] )

  publish:
  ch_foo >> 'foo'
}

It would be helpful if the 'foo' tag was made available to at least provide some labelling of the channels.

void onWorkflowPublish(String channelTag, Object value) {
  // ...

@bentsherman
Copy link
Member Author

I had a similar thought. The target name might be nice to have just for record keeping, but it doesn't reveal anything new about the expected structure of the value

It's not entirely hopeless though. If you look at DumpHelper, it's used by the dump operator to pretty print arbitrary objects as JSON, and it's pretty robust. So you could take a similar approach with this event hook.

A plugin basically has two options:

  1. Document the data structures that you know how to handle (e.g. tuple of meta map and files)
  2. Expose some standard record types with different meanings, which pipeline developers can use at the end of their pipeline (e.g. "if you publish a channel with this record type, this plugin will do X with it")

Both seem kinda sketchy

@robsyme
Copy link
Collaborator

robsyme commented Oct 2, 2024

The unknowability of the exact structure of the objects in channels doesn't worry me too much. Nextflow (and any plugin) should dutifully serialize the object.

We already impose one important limitation on objects in channels (or at least objects that support -resume anyway) - that is they should be seriazliable by Kryo. A brave suggestion would be to change the serialization requirement that objects should be CBOR-able or JSON-able rather than Kryo serializable, then we could guarantee that objects in channels could be serialized by an plugin and deserialized by downstream, non-Nextflow tools.
Kryo's big drawcard is that it is space efficient, but of course we throw away the serialized form after hashing anyway, so I'd argue that space efficiency is moot anyway.

@bentsherman
Copy link
Member Author

I like the idea of requiring it to be serializable into some kind of format. That would be a good thing to explore with your plugin. I'll try to at least get this workflow event merged in the second preview.

Is CBOR not space efficient compared to Kryo?

@robsyme
Copy link
Collaborator

robsyme commented Oct 2, 2024

CBOR and Kryo will both produce relatively efficient, compressed forms. JSON serialization will take more space than the other two, but space isn't really a concren because:

  1. the serialized forms are not being transmitted over the wire in an environment that requires low latency
  2. the objects are almost always very small anyway.

@Austin-s-h
Copy link

Hi! I don't want to hijack this thread, but I did have a couple questions related to #2844 and #5185 and I'm hoping this is an appropriate place.

With the new output style, I really like the creation of an index file that contains file name/output path information. I see this being helpful in a variety of scenarios. I am specifically interested in generating md5sum hashes of my output files. This could be included in the index as an additional column per-file if enabled via the path directive? I also saw a draft about enabling E-tag aware files for Azure and AWS? I know that E-tags on AWS can vary based on the chunk/part size during upload, so I am concerned about assuming this value is always the valid hash. From what I saw it looked like the hash would be calculated first using a standard implementation, but is it validated that this matches the E-tag?

Currently, with Nextflow 24.04, my plan to achieve this behavior using publishDir is to implement the checksum as part of my process and add it as a storage tag (publishing to S3). I would write another process that would collect the output files and generate an index similar to what the new output style would do. This might work but isn't very elegant, is there a better pattern that exists already?

@bentsherman
Copy link
Member Author

A simple solution would be to define a function that computes the md5 of a file:

// Path -> String
def md5(file) {
  // ...
}

Then you could compute the md5 whenever you want and append it to the channel value:

ch_fastq.map { meta, fastq_1, fastq_2 ->
  [ meta, fastq_1, fastq_2, md5(fastq_1), md5(fastq_2) ]
}

A more efficient solution would be to compute the md5 while you already have the data locally, as it is done here: https://github.com/nf-core/fetchngs/blob/c60d09be3f73376156471d6ed78fcad646736c2f/modules/local/aspera_cli/main.nf#L26-L33

Either way, you have to compute these checksums yourself if you want them to be in the index file, because you are responsible for defining the schema and columns, etc using the mapper directive

@bentsherman
Copy link
Member Author

Note, I think we could provide md5() as a standard library function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants