Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

normal channel async message passing stops after I start up the tcp server #45

Open
kmf-lab opened this issue Jul 6, 2024 · 0 comments

Comments

@kmf-lab
Copy link

kmf-lab commented Jul 6, 2024

Here is the shortest example I could do. If we comment out the TODO: line the message passing works fine but with the tcp server in place we can hit it with the browser to get different results yet the message passing on the same local thread stops. Async should continue working on this thread so I am not sure where it is blocking.

use nuclei::;
use std::net::TcpListener;
use std::time::Duration;
use anyhow::Result;
use async_dup::Arc;
use futures::prelude::
;
use futures_timer::Delay;
use http_types::{Request, Response, StatusCode};

/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result {
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
let now = format!("{:?}", std::time::SystemTime::now()); //Serving the time
res.set_body(now);
Ok(res)
}

/// Listens for incoming connections and serves them.
async fn listen(listener: Handle) -> Result<()> {
// Format the full host address.
let host = format!("http://{}", listener.get_ref().local_addr()?);
println!("Listening on {}", host);

loop {
    // Accept the next connection.
    let (stream, _) = listener.accept().await?;
    println!("Accepted connection");
    // Spawn a background task serving this connection.
    let stream = Arc::new(stream);
    nuclei::block_on(spawn_local(async move {
        if let Err(err) = async_h1::accept(stream, serve).await {
            println!("Connection error: {:#?}", err);
        }
    })); //we do not detach span_local instead we block on it.
}

}

#[nuclei::main]
async
fn main() -> Result<()> {
//nuclei::drive( async {run_now(); } );
run_now();
Ok(())
}

fn run_now() -> Result<()> {

//create an async channel
let (tx, rx) = async_channel::bounded::<String>(10);

let t = async move {
    let mut count = 0;
    loop {
        tx.send("Hello".to_string()).await;
        count += 1;
        println!("Sent {} messages", count);
        Delay::new(Duration::from_secs(1)).await;
    }
};
nuclei::spawn_local(t).detach();

let r = async move {
    let mut count = 0;
    loop {
        rx.recv().await.unwrap();
        count += 1;
        println!("Received {} messages", count);
    }
};
nuclei::spawn_local(r).detach();

let http = listen(Handle::<TcpListener>::bind("0.0.0.0:8001")?);

//TODO: NOTE: this line enables the server but causes the above message passing to stop.
nuclei::spawn_local(http).detach(); //this prevents all other async from working.

//only run our test for 400 seconds
nuclei::block_on(
    nuclei::spawn_local(
        Delay::new(Duration::from_secs(400))
    ));

Ok(())

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant