Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support including existing files within watchPath? #5353

Open
siddharthab opened this issue Oct 2, 2024 · 6 comments
Open

Support including existing files within watchPath? #5353

siddharthab opened this issue Oct 2, 2024 · 6 comments

Comments

@siddharthab
Copy link
Contributor

New feature

It does not seem like there is a perfect way of picking up existing files and any new files from a root path using a combination of the two channel factories - watchPath and fromPath. Calling them one after the other presents synchronization issues between the two calls. If watchPath is called first, then you may get duplicates, and if it is called second, then you may drop some files created between the calls.

Usage scenario

This is useful when a separate pipeline is generating data that is supposed to be consumed by a Nextflow workflow. That separate pipeline could have been started some time ago, having already generated some files, so the Nextflow workflow needs to catch up and then stay current.

A workflow from epi2me-labs uses watchPath in a way that can drop files.

Suggest implementation

Seems to me that adding an option to watchPath would be the way to go? Unless there is an easy workaround already.

Happy to work on this if people want.

@bentsherman
Copy link
Member

Channel.fromPath is essentially a channel wrapper over the files() function. It might be easier to use that instead to drop duplicates

@siddharthab
Copy link
Contributor Author

@bentsherman I don't understand your comment. How may I be able to use the files() function to achieve synchronization across the call to itself and the setting up of the watchPath channel.

My current code looks something like this (assuming that asSychronized() is working as I expect it to):

workflow {
    String pattern = 'input/*'
    String stop_filename = 'STOP'
    Set<String> input_names = ([] as Set<String>).asSynchronized()

    new_inputs = Channel
        .watchPath(pattern)
        .until { it.name == stop_filename }
        .filter { input_names.add(it.name) }
    new_inputs.view { "New ${it.name}" }

    existing_inputs = Channel
        .fromPath(pattern)
        .filter { input_names.add(it.name) }
    existing_inputs.view { "Existing ${it.name}" }

    inputs = existing_inputs.mix(new_inputs)
    inputs.view()
}

@bentsherman
Copy link
Member

I was imagining that you would query the set of existing files and use that to filter incoming files from watchPath:

def existing_inputs = files(pattern)
def new_inputs = Channel.watchPath(pattern)
  .filter { file -> file !in existing_inputs }

def inputs = Channel.of(existing_inputs).mix(new_inputs)

You should also be able to achieve your example with a concurrent set:

def input_names = Collections.newSetFromMap(new ConcurrentHashMap<String>())

@siddharthab
Copy link
Contributor Author

I think I understand what you are trying to say with using files, but I think there is some miscommunication. I have the opposite problem.

Consider the following scenarios:

# Scenario 1 - no issues
files created
start watching
list all files
files created

# Scenario 2 - no issues
files created
list all files
start watching
files created

# Scenario 3 - file 'foo' will be missed
files created
list all files
file 'foo' created
start watching
files created

# Scenario 4 - file 'foo' will be counted twice
files created
start watching
file 'foo' created
list all files
files created

I am just looking for a way to list all current and future files matching the pattern without dropping a file or counting a file twice.

@bentsherman
Copy link
Member

You might be able to modify watchPath to (1) start watching files (2) query existing files (3) emit existing files that weren't already caught by the watcher

But taking a step back, the filesystem doesn't seem like a good fit for this kind of event streaming. watchPath is good for simple use cases but eventually it becomes better to handle event dispatch at a higher level, e.g. deploy an external service that launches a Nextflow pipeline whenever an object is uploaded to an S3 bucket

Of course you might be more constrained in an HPC context

@siddharthab
Copy link
Contributor Author

siddharthab commented Oct 16, 2024

I get your point that events, even file creation events, can be obtained from other sources other than the file system. But right now there does not seem to be a mechanism to define channels from such event streams, like pub/sub on cloud providers. Moreover, as you said, it is not easy to set up something like this in local HPC context.

The example you give of launching a new Nextflow pipeline for an event is applicable only when the pipeline takes only one file as an input, but in the majority of use cases, each file will be an element in a channel.

Short of a more formal way to consume events from the outside, I think modifying watchPath is a better solution here. Meanwhile, the workaround that I posted above seems to be working well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants