Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator subscribe returns wrong results when chained and hangs at the end #5354

Open
siddharthab opened this issue Oct 2, 2024 · 2 comments

Comments

@siddharthab
Copy link
Contributor

Bug report

Expected behavior and actual behavior

When subscribe is chained with another operator like view, some elements from the channel go to subscribe and some to view, but never both. As per the documentation, subscribe should return the source channel, thereby all elements should flow through all operators in the chain. The execution hangs after, which is a good thing in this case, because it prompts the user to look closer. Otherwise, the results would be incorrect.

Steps to reproduce the problem

workflow {
    Channel
        .of('foo', 'bar', 'baz')
        .subscribe { println "Subscribe $it" }
        .view { println "View $it" }
}

Program output

View bar
Subscribe foo
Subscribe baz
^C # Had to manually kill the process

Environment

  • Nextflow version: 24.04.4 build 5917
  • Java version: OpenJDK Runtime Environment Zulu21.28+85-CA (build 21+35)
  • Operating system: macOS
  • Bash version: zsh 5.9 (arm64-apple-darwin24.0)

Additional context

Seems to be a duplicate of #4436, but I wanted to call out the unexpected branching behavior.

@bentsherman
Copy link
Member

I suspect it is because subscribe returns the same reference to the source channel instead of a copy:

DataflowReadChannel subscribe(final DataflowReadChannel source, final Closure closure) {
subscribeImpl( source, [onNext: closure] )
return source
}

In which case I think it could be solved with CH.getReadChannel():

    DataflowReadChannel subscribe(final DataflowReadChannel source, final Closure closure) {
        subscribeImpl( source, [onNext: closure] )
        return CH.getReadChannel(source)
    }

I also suspect that if you change it to view -> subscribe then it will be correct

@siddharthab
Copy link
Contributor Author

Thanks for taking a look. I tried the change you suggested and it does not seem to work.

% git diff -U1 
diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
index 3614de19d..c5b74a952 100644
--- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
+++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy
@@ -72,3 +72,3 @@ class OperatorImpl {
         subscribeImpl( source, [onNext: closure] )
-        return source
+        return CH.getReadChannel(source)
     }
@@ -84,3 +84,3 @@ class OperatorImpl {
         subscribeImpl(source, events)
-        return source
+        return CH.getReadChannel(source)
     }
% ./compile.sh           
% ./launch.sh run main.nf

 N E X T F L O W   ~  version 24.09.2-edge

Launching `main.nf` [reverent_tesla] DSL2 - revision: 1994a3a13a

View bar
Subscribe foo
Subscribe baz
^C
% cat main.nf
workflow {
    Channel
        .of('foo', 'bar', 'baz')
        .subscribe { println "Subscribe $it" }
        .view { println "View $it" }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants