diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f07e87e..0304534e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## \[4.0.0\] - unreleased +This release aims to improve Pueue and to rectify some old design decisions. + +### Multi-threaded architecture + Up until recently, Pueue had the subprocesses' (tasks') state live in a dedicated thread. Client commands that directly affected subprocesses, such as `pueue start --immediate`, were forwarded to that special thread via an `mpsc` channel to be further processed. @@ -15,13 +19,29 @@ Commands like `pueue add --immediate install_something && pueue send 0 'y\n'` wo The new state design fixes this issue, which allows Pueue to do subprocess state manipulation directly inside of the client message handlers, effectively removing any delays. +TLDR: Commands that start/stop/kill/pause tasks only return when the task is actually started/stopped/killed/paused. + +### Runtime invariants + +Previously, various state related runtime invariants were enforced by convention. For example, a task that is queued should not have a `start` or `enqueued_at` time set. +However, this approach is highly error-prone as it's really hard to always think of everything that needs to be set or cleaned up on every possible state transition. + +Luckily, this is an issue that can be fixed in a (rather) elegant way in Rust, using struct enums. That way, those invariants are enforced via the type system during the compile time. +The code is a bit more verbose (~25%), but it prevents a whole category of bugs and while doing this refactoring I actually found at least 2 cases where I forgot to clear a variable. + +TLDR: The new task state handling is a bit more verbose, but a lot cleaner and type safe. + ### Fixed - Fixed delay after sending process related commands from client. [#548](https://github.com/Nukesor/pueue/pull/548) ### Change -- **Breaking**: Streamlined `pueue log` parameters to behave the same way was `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509) +- **Breaking**: Refactor internal task state. Some task variables have been moved into the `TaskStatus` enum, which now enforces various invariants during compile time via the type system. + Due to this, several subtle time related inconsistencies (task start/stop/enqueue times) have been fixed. [#556](https://github.com/Nukesor/pueue/pull/556) \ + **Important: This completely breaks backwards compatibility, including previous state.** + **Important: The Pueue daemon needs to be restarted and the state will be wiped clean.** +- **Breaking**: Streamlined `pueue log` parameters to behave the same way as `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509) - **Breaking**: Remove the `--children` commandline flags, that have been deprecated and no longer serve any function since `v3.0.0`. ### Add diff --git a/Cargo.lock b/Cargo.lock index 64c78854..b6f13667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,6 +111,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-trait" version = "0.1.80" @@ -1268,6 +1274,7 @@ version = "3.4.1" dependencies = [ "anyhow", "assert_cmd", + "assert_matches", "better-panic", "chrono", "clap", diff --git a/pueue/Cargo.toml b/pueue/Cargo.toml index 5cd7d3f0..e0ab31bf 100644 --- a/pueue/Cargo.toml +++ b/pueue/Cargo.toml @@ -43,6 +43,7 @@ tokio = { workspace = true } [dev-dependencies] anyhow = { workspace = true } assert_cmd = "2" +assert_matches = "1" better-panic = { workspace = true } # Make it easy to view log output for select tests. # Set log level for tests with RUST_LOG=, use with failed tests or diff --git a/pueue/src/client/commands/restart.rs b/pueue/src/client/commands/restart.rs index 0816c19d..ad9c28f1 100644 --- a/pueue/src/client/commands/restart.rs +++ b/pueue/src/client/commands/restart.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Result}; +use chrono::Local; use pueue_lib::network::message::*; use pueue_lib::network::protocol::*; use pueue_lib::settings::Settings; @@ -32,7 +33,9 @@ pub async fn restart( let new_status = if stashed { TaskStatus::Stashed { enqueue_at: None } } else { - TaskStatus::Queued + TaskStatus::Queued { + enqueued_at: Local::now(), + } }; let state = get_state(stream).await?; @@ -50,13 +53,19 @@ pub async fn restart( state.filter_tasks(done_filter, None) }; - // now pick the failed tasks + // Now pick the failed tasks let failed = filtered_tasks .matching_ids .into_iter() .filter(|task_id| { let task = state.tasks.get(task_id).unwrap(); - !matches!(task.status, TaskStatus::Done(TaskResult::Success)) + !matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + } + ) }) .collect(); diff --git a/pueue/src/client/commands/wait.rs b/pueue/src/client/commands/wait.rs index 633fe8d3..a1aeddb3 100644 --- a/pueue/src/client/commands/wait.rs +++ b/pueue/src/client/commands/wait.rs @@ -140,15 +140,27 @@ pub async fn wait( fn reached_target_status(task: &Task, target_status: &WaitTargetStatus) -> bool { match target_status { WaitTargetStatus::Queued => { - task.status == TaskStatus::Queued - || task.status == TaskStatus::Running - || matches!(task.status, TaskStatus::Done(_)) + matches!( + task.status, + TaskStatus::Queued { .. } | TaskStatus::Running { .. } | TaskStatus::Done { .. } + ) } WaitTargetStatus::Running => { - task.status == TaskStatus::Running || matches!(task.status, TaskStatus::Done(_)) + matches!( + task.status, + TaskStatus::Running { .. } | TaskStatus::Done { .. } + ) + } + WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done { .. }), + WaitTargetStatus::Success => { + matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + } + ) } - WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done(_)), - WaitTargetStatus::Success => matches!(task.status, TaskStatus::Done(TaskResult::Success)), } } @@ -201,7 +213,7 @@ fn log_status_change(previous_status: TaskStatus, task: &Task, style: &OutputSty // Check if the task has finished. // In case it has, show the task's result in human-readable form. // Color some parts of the output depending on the task's outcome. - if let TaskStatus::Done(result) = &task.status { + if let TaskStatus::Done { result, .. } = &task.status { let text = match result { TaskResult::Success => { let status = style.style_text("0", Some(Color::Green), None); @@ -246,8 +258,15 @@ fn log_status_change(previous_status: TaskStatus, task: &Task, style: &OutputSty fn get_color_for_status(task_status: &TaskStatus) -> Color { match task_status { - TaskStatus::Running | TaskStatus::Done(_) => Color::Green, - TaskStatus::Paused | TaskStatus::Locked => Color::White, + TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => Color::White, + TaskStatus::Running { .. } => Color::Green, + TaskStatus::Done { result, .. } => { + if matches!(result, TaskResult::Success) { + Color::Green + } else { + Color::Red + } + } _ => Color::White, } } diff --git a/pueue/src/client/display/helper.rs b/pueue/src/client/display/helper.rs index 36c45c04..d44aa1fc 100644 --- a/pueue/src/client/display/helper.rs +++ b/pueue/src/client/display/helper.rs @@ -45,9 +45,10 @@ pub fn sort_tasks_by_group(tasks: Vec) -> BTreeMap> { /// If the task doesn't have a start and/or end yet, an empty string will be returned /// for the respective field. pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String) { - // Get the start time. + let (start, end) = task.start_and_end(); + // If the task didn't start yet, just return two empty strings. - let start = match task.start { + let start = match start { Some(start) => start, None => return ("".into(), "".into()), }; @@ -65,8 +66,8 @@ pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String) .to_string() }; - // Get finish time, if already set. Otherwise only return the formatted start. - let end = match task.end { + // If the task didn't finish yet, only return the formatted start. + let end = match end { Some(end) => end, None => return (formatted_start, "".into()), }; diff --git a/pueue/src/client/display/log/mod.rs b/pueue/src/client/display/log/mod.rs index 92819235..9b8accb7 100644 --- a/pueue/src/client/display/log/mod.rs +++ b/pueue/src/client/display/log/mod.rs @@ -97,7 +97,7 @@ pub fn print_logs( if let Some((_, task_log)) = task_iter.peek() { if matches!( &task_log.task.status, - TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused, + TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. } ) { println!(); } @@ -122,7 +122,7 @@ fn print_log( // We only show logs of finished or running tasks. if !matches!( task.status, - TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused + TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. } ) { return; } @@ -148,9 +148,9 @@ fn print_task_info(task: &Task, style: &OutputStyle) { ); let (exit_status, color) = match &task.status { - TaskStatus::Paused => ("paused".into(), Color::White), - TaskStatus::Running => ("running".into(), Color::Yellow), - TaskStatus::Done(result) => match result { + TaskStatus::Paused { .. } => ("paused".into(), Color::White), + TaskStatus::Running { .. } => ("running".into(), Color::Yellow), + TaskStatus::Done { result, .. } => match result { TaskResult::Success => ("completed successfully".into(), Color::Green), TaskResult::Failed(exit_code) => { (format!("failed with exit code {exit_code}"), Color::Red) @@ -197,14 +197,16 @@ fn print_task_info(task: &Task, style: &OutputStyle) { ]); } + let (start, end) = task.start_and_end(); + // Start and end time - if let Some(start) = task.start { + if let Some(start) = start { table.add_row(vec![ style.styled_cell("Start:", None, Some(ComfyAttribute::Bold)), Cell::new(start.to_rfc2822()), ]); } - if let Some(end) = task.end { + if let Some(end) = end { table.add_row(vec![ style.styled_cell("End:", None, Some(ComfyAttribute::Bold)), Cell::new(end.to_rfc2822()), diff --git a/pueue/src/client/display/table_builder.rs b/pueue/src/client/display/table_builder.rs index a55e0c35..1b5ee2be 100644 --- a/pueue/src/client/display/table_builder.rs +++ b/pueue/src/client/display/table_builder.rs @@ -208,9 +208,11 @@ impl<'a> TableBuilder<'a> { // Determine the human readable task status representation and the respective color. let status_string = task.status.to_string(); let (status_text, color) = match &task.status { - TaskStatus::Running => (status_string, Color::Green), - TaskStatus::Paused | TaskStatus::Locked => (status_string, Color::White), - TaskStatus::Done(result) => match result { + TaskStatus::Running { .. } => (status_string, Color::Green), + TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => { + (status_string, Color::White) + } + TaskStatus::Done { result, .. } => match result { TaskResult::Success => (TaskResult::Success.to_string(), Color::Green), TaskResult::DependencyFailed => { ("Dependency failed".to_string(), Color::Red) diff --git a/pueue/src/client/query/filters.rs b/pueue/src/client/query/filters.rs index 48849b93..f9498aeb 100644 --- a/pueue/src/client/query/filters.rs +++ b/pueue/src/client/query/filters.rs @@ -115,13 +115,15 @@ pub fn datetime(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Resu enqueue_at } Rule::column_start => { - let Some(start) = task.start else { + let (start, _) = task.start_and_end(); + let Some(start) = start else { return false; }; start } Rule::column_end => { - let Some(end) = task.end else { + let (_, end) = task.start_and_end(); + let Some(end) = end else { return false; }; end @@ -311,16 +313,22 @@ pub fn status(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Result // Build the filter function for the task's status. let filter_function = Box::new(move |task: &Task| -> bool { let matches = match operand { - Rule::status_queued => matches!(task.status, TaskStatus::Queued), + Rule::status_queued => matches!(task.status, TaskStatus::Queued { .. }), Rule::status_stashed => matches!(task.status, TaskStatus::Stashed { .. }), - Rule::status_running => matches!(task.status, TaskStatus::Running), - Rule::status_paused => matches!(task.status, TaskStatus::Paused), + Rule::status_running => matches!(task.status, TaskStatus::Running { .. }), + Rule::status_paused => matches!(task.status, TaskStatus::Paused { .. }), Rule::status_success => { - matches!(&task.status, TaskStatus::Done(TaskResult::Success)) + matches!( + &task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + } + ) } Rule::status_failed => { let mut matches = false; - if let TaskStatus::Done(result) = &task.status { + if let TaskStatus::Done { result, .. } = &task.status { if !matches!(result, TaskResult::Success) { matches = true; } diff --git a/pueue/src/client/query/mod.rs b/pueue/src/client/query/mod.rs index 0ed13010..7e90093c 100644 --- a/pueue/src/client/query/mod.rs +++ b/pueue/src/client/query/mod.rs @@ -77,11 +77,11 @@ impl QueryResult { fn rank_status(task: &Task) -> u8 { match &task.status { TaskStatus::Stashed { .. } => 0, - TaskStatus::Locked => 1, - TaskStatus::Queued => 2, - TaskStatus::Paused => 3, - TaskStatus::Running => 4, - TaskStatus::Done(result) => match result { + TaskStatus::Locked { .. } => 1, + TaskStatus::Queued { .. } => 2, + TaskStatus::Paused { .. } => 3, + TaskStatus::Running { .. } => 4, + TaskStatus::Done { result, .. } => match result { TaskResult::Success => 6, _ => 5, }, @@ -93,8 +93,16 @@ impl QueryResult { Rule::column_label => task1.label.cmp(&task2.label), Rule::column_command => task1.command.cmp(&task2.command), Rule::column_path => task1.path.cmp(&task2.path), - Rule::column_start => task1.start.cmp(&task2.start), - Rule::column_end => task1.end.cmp(&task2.end), + Rule::column_start => { + let (start1, _) = task1.start_and_end(); + let (start2, _) = task2.start_and_end(); + start1.cmp(&start2) + } + Rule::column_end => { + let (_, end1) = task1.start_and_end(); + let (_, end2) = task2.start_and_end(); + end1.cmp(&end2) + } _ => std::cmp::Ordering::Less, }); diff --git a/pueue/src/daemon/callbacks.rs b/pueue/src/daemon/callbacks.rs index 71c52c5d..338b025f 100644 --- a/pueue/src/daemon/callbacks.rs +++ b/pueue/src/daemon/callbacks.rs @@ -64,7 +64,7 @@ pub fn build_callback_command( parameters.insert("group", task.group.clone()); // Result takes the TaskResult Enum strings, unless it didn't finish yet. - if let TaskStatus::Done(result) = &task.status { + if let TaskStatus::Done { result, .. } = &task.status { parameters.insert("result", result.to_string()); } else { parameters.insert("result", "None".into()); @@ -75,8 +75,9 @@ pub fn build_callback_command( time.map(|time| time.timestamp().to_string()) .unwrap_or_default() }; - parameters.insert("start", print_time(task.start)); - parameters.insert("end", print_time(task.end)); + let (start, end) = task.start_and_end(); + parameters.insert("start", print_time(start)); + parameters.insert("end", print_time(end)); // Read the last lines of the process' output and make it available. if let Ok(output) = read_last_log_file_lines( @@ -95,7 +96,7 @@ pub fn build_callback_command( parameters.insert("output_path", out_path.display().to_string()); // Get the exit code - if let TaskStatus::Done(result) = &task.status { + if let TaskStatus::Done { result, .. } = &task.status { match result { TaskResult::Success => parameters.insert("exit_code", "0".into()), TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()), diff --git a/pueue/src/daemon/network/message_handler/add.rs b/pueue/src/daemon/network/message_handler/add.rs index e07bf96c..0846eff6 100644 --- a/pueue/src/daemon/network/message_handler/add.rs +++ b/pueue/src/daemon/network/message_handler/add.rs @@ -35,20 +35,19 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) - message.path, message.envs, message.group, - TaskStatus::Locked, + TaskStatus::Queued { + enqueued_at: Local::now(), + }, message.dependencies, message.priority.unwrap_or(0), message.label, ); - // Set the starting status. + // Handle if the command is to be stashed and/or automatically enqueued later. if message.stashed || message.enqueue_at.is_some() { task.status = TaskStatus::Stashed { enqueue_at: message.enqueue_at, }; - } else { - task.status = TaskStatus::Queued; - task.enqueued_at = Some(Local::now()); } // Check if there're any aliases that should be applied. diff --git a/pueue/src/daemon/network/message_handler/clean.rs b/pueue/src/daemon/network/message_handler/clean.rs index 188e7921..455eda0d 100644 --- a/pueue/src/daemon/network/message_handler/clean.rs +++ b/pueue/src/daemon/network/message_handler/clean.rs @@ -28,7 +28,7 @@ pub fn clean(settings: &Settings, state: &SharedState, message: CleanMessage) -> let mut state = state.lock().unwrap(); let filtered_tasks = - state.filter_tasks(|task| matches!(task.status, TaskStatus::Done(_)), None); + state.filter_tasks(|task| matches!(task.status, TaskStatus::Done { .. }), None); for task_id in &filtered_tasks.matching_ids { // Ensure the task is removable, i.e. there are no dependant tasks. @@ -40,7 +40,13 @@ pub fn clean(settings: &Settings, state: &SharedState, message: CleanMessage) -> if let Some(task) = state.tasks.get(task_id) { // Check if we should ignore this task, if only successful tasks should be removed. if message.successful_only - && !matches!(task.status, TaskStatus::Done(TaskResult::Success)) + && !matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + } + ) { continue; } @@ -82,7 +88,7 @@ mod tests { impl TaskAddable for State { fn add_stub_task(&mut self, id: &str, group: &str, task_result: TaskResult) { - let task = get_stub_task_in_group(id, group, TaskStatus::Done(task_result)); + let task = get_stub_task_in_group(id, group, StubStatus::Done(task_result)); self.add_task(task); } } diff --git a/pueue/src/daemon/network/message_handler/edit.rs b/pueue/src/daemon/network/message_handler/edit.rs index 78cb2c7e..6f160acc 100644 --- a/pueue/src/daemon/network/message_handler/edit.rs +++ b/pueue/src/daemon/network/message_handler/edit.rs @@ -19,8 +19,9 @@ pub fn edit_request(state: &SharedState, task_id: usize) -> Message { if !task.is_queued() && !task.is_stashed() { return create_failure_message("You can only edit a queued/stashed task"); } - task.prev_status = task.status.clone(); - task.status = TaskStatus::Locked; + task.status = TaskStatus::Locked { + previous_status: Box::new(task.status.clone()), + }; EditResponseMessage { task_id: task.id, @@ -42,12 +43,12 @@ pub fn edit(settings: &Settings, state: &SharedState, message: EditMessage) -> M let mut state = state.lock().unwrap(); match state.tasks.get_mut(&message.task_id) { Some(task) => { - if !(task.status == TaskStatus::Locked) { + let TaskStatus::Locked { previous_status } = &task.status else { return create_failure_message("Task is no longer locked."); - } + }; // Restore the task to its previous state. - task.status = task.prev_status.clone(); + task.status = *previous_status.clone(); // Update command if applicable. if let Some(command) = message.command { @@ -83,10 +84,12 @@ pub fn edit_restore(state: &SharedState, task_id: usize) -> Message { let mut state = state.lock().unwrap(); match state.tasks.get_mut(&task_id) { Some(task) => { - if task.status != TaskStatus::Locked { + let TaskStatus::Locked { previous_status } = &task.status else { return create_failure_message("The requested task isn't locked"); - } - task.status = task.prev_status.clone(); + }; + + // Restore the task to its previous state. + task.status = *previous_status.clone(); success_msg!( "The requested task's status has been restored to '{}'", diff --git a/pueue/src/daemon/network/message_handler/enqueue.rs b/pueue/src/daemon/network/message_handler/enqueue.rs index 629be52a..a1dfb663 100644 --- a/pueue/src/daemon/network/message_handler/enqueue.rs +++ b/pueue/src/daemon/network/message_handler/enqueue.rs @@ -10,7 +10,12 @@ use crate::daemon::network::response_helper::*; pub fn enqueue(state: &SharedState, message: EnqueueMessage) -> Message { let mut state = state.lock().unwrap(); let filtered_tasks = state.filter_tasks( - |task| matches!(task.status, TaskStatus::Stashed { .. } | TaskStatus::Locked), + |task| { + matches!( + task.status, + TaskStatus::Stashed { .. } | TaskStatus::Locked { .. } + ) + }, Some(message.task_ids), ); @@ -25,8 +30,9 @@ pub fn enqueue(state: &SharedState, message: EnqueueMessage) -> Message { enqueue_at: message.enqueue_at, }; } else { - task.status = TaskStatus::Queued; - task.enqueued_at = Some(Local::now()); + task.status = TaskStatus::Queued { + enqueued_at: Local::now(), + }; } } diff --git a/pueue/src/daemon/network/message_handler/mod.rs b/pueue/src/daemon/network/message_handler/mod.rs index dac89d27..f7404609 100644 --- a/pueue/src/daemon/network/message_handler/mod.rs +++ b/pueue/src/daemon/network/message_handler/mod.rs @@ -75,6 +75,7 @@ macro_rules! ok_or_save_state_failure { #[cfg(test)] mod fixtures { + use chrono::{DateTime, Duration, Local}; use std::collections::HashMap; use std::env::temp_dir; use std::sync::{Arc, Mutex}; @@ -84,6 +85,15 @@ mod fixtures { pub use pueue_lib::state::{SharedState, State, PUEUE_DEFAULT_GROUP}; pub use pueue_lib::task::{Task, TaskResult, TaskStatus}; + // A simple helper struct to keep the boilerplate for TaskStatus creation down. + pub enum StubStatus { + Queued, + Running, + Paused, + Stashed { enqueue_at: Option> }, + Done(TaskResult), + } + pub fn get_settings() -> (Settings, TempDir) { let tempdir = TempDir::new().expect("Failed to create test pueue directory"); let mut settings = Settings::default(); @@ -110,7 +120,25 @@ mod fixtures { } /// Create a new task with stub data in the given group - pub fn get_stub_task_in_group(id: &str, group: &str, status: TaskStatus) -> Task { + pub fn get_stub_task_in_group(id: &str, group: &str, status: StubStatus) -> Task { + // Build a proper Task status based on the simplified requested stub status. + let enqueued_at = Local::now() - Duration::minutes(5); + let start = Local::now() - Duration::minutes(4); + let end = Local::now() - Duration::minutes(1); + + let status = match status { + StubStatus::Stashed { enqueue_at } => TaskStatus::Stashed { enqueue_at }, + StubStatus::Queued => TaskStatus::Queued { enqueued_at }, + StubStatus::Running => TaskStatus::Running { enqueued_at, start }, + StubStatus::Paused => TaskStatus::Paused { enqueued_at, start }, + StubStatus::Done(result) => TaskStatus::Done { + enqueued_at, + start, + end, + result, + }, + }; + Task::new( id.to_string(), temp_dir(), @@ -124,7 +152,7 @@ mod fixtures { } /// Create a new task with stub data - pub fn get_stub_task(id: &str, status: TaskStatus) -> Task { + pub fn get_stub_task(id: &str, status: StubStatus) -> Task { get_stub_task_in_group(id, PUEUE_DEFAULT_GROUP, status) } @@ -133,23 +161,23 @@ mod fixtures { { // Queued task let mut state = state.lock().unwrap(); - let task = get_stub_task("0", TaskStatus::Queued); + let task = get_stub_task("0", StubStatus::Queued); state.add_task(task); // Finished task - let task = get_stub_task("1", TaskStatus::Done(TaskResult::Success)); + let task = get_stub_task("1", StubStatus::Done(TaskResult::Success)); state.add_task(task); // Stashed task - let task = get_stub_task("2", TaskStatus::Stashed { enqueue_at: None }); + let task = get_stub_task("2", StubStatus::Stashed { enqueue_at: None }); state.add_task(task); // Running task - let task = get_stub_task("3", TaskStatus::Running); + let task = get_stub_task("3", StubStatus::Running); state.add_task(task); // Paused task - let task = get_stub_task("4", TaskStatus::Paused); + let task = get_stub_task("4", StubStatus::Paused); state.add_task(task); } diff --git a/pueue/src/daemon/network/message_handler/pause.rs b/pueue/src/daemon/network/message_handler/pause.rs index 466ccf06..e9f11860 100644 --- a/pueue/src/daemon/network/message_handler/pause.rs +++ b/pueue/src/daemon/network/message_handler/pause.rs @@ -22,7 +22,7 @@ pub fn pause(settings: &Settings, state: &SharedState, message: PauseMessage) -> TaskSelection::TaskIds(task_ids) => task_action_response_helper( "Tasks are being paused", task_ids.clone(), - |task| matches!(task.status, TaskStatus::Running), + |task| matches!(task.status, TaskStatus::Running { .. }), &state, ), TaskSelection::Group(group) => { diff --git a/pueue/src/daemon/network/message_handler/remove.rs b/pueue/src/daemon/network/message_handler/remove.rs index 9be67e75..e0c189f3 100644 --- a/pueue/src/daemon/network/message_handler/remove.rs +++ b/pueue/src/daemon/network/message_handler/remove.rs @@ -19,10 +19,10 @@ pub fn remove(settings: &Settings, state: &SharedState, task_ids: Vec) -> let filter = |task: &Task| { matches!( task.status, - TaskStatus::Queued + TaskStatus::Queued { .. } | TaskStatus::Stashed { .. } - | TaskStatus::Done(_) - | TaskStatus::Locked + | TaskStatus::Done { .. } + | TaskStatus::Locked { .. } ) }; let mut filtered_tasks = state.filter_tasks(filter, Some(task_ids)); @@ -82,12 +82,12 @@ mod tests { { let mut state = state.lock().unwrap(); // Add a task with a dependency to a finished task - let mut task = get_stub_task("5", TaskStatus::Queued); + let mut task = get_stub_task("5", StubStatus::Queued); task.dependencies = vec![1]; state.add_task(task); // Add a task depending on the previous task -> Linked dependencies - let mut task = get_stub_task("6", TaskStatus::Queued); + let mut task = get_stub_task("6", StubStatus::Queued); task.dependencies = vec![5]; state.add_task(task); } diff --git a/pueue/src/daemon/network/message_handler/restart.rs b/pueue/src/daemon/network/message_handler/restart.rs index e959a30c..ba4b0f82 100644 --- a/pueue/src/daemon/network/message_handler/restart.rs +++ b/pueue/src/daemon/network/message_handler/restart.rs @@ -69,10 +69,10 @@ fn restart( // Either enqueue the task or stash it. if stashed { task.status = TaskStatus::Stashed { enqueue_at: None }; - task.enqueued_at = None; } else { - task.status = TaskStatus::Queued; - task.enqueued_at = Some(Local::now()); + task.status = TaskStatus::Queued { + enqueued_at: Local::now(), + }; }; // Update command if applicable. @@ -97,8 +97,4 @@ fn restart( if let Some(priority) = to_restart.priority { task.priority = priority; } - - // Reset all variables of any previous run. - task.start = None; - task.end = None; } diff --git a/pueue/src/daemon/network/message_handler/start.rs b/pueue/src/daemon/network/message_handler/start.rs index 428041c6..4428376e 100644 --- a/pueue/src/daemon/network/message_handler/start.rs +++ b/pueue/src/daemon/network/message_handler/start.rs @@ -25,7 +25,9 @@ pub fn start(settings: &Settings, state: &SharedState, message: StartMessage) -> |task| { matches!( task.status, - TaskStatus::Paused | TaskStatus::Queued | TaskStatus::Stashed { .. } + TaskStatus::Paused { .. } + | TaskStatus::Queued { .. } + | TaskStatus::Stashed { .. } ) }, &state, diff --git a/pueue/src/daemon/network/message_handler/stash.rs b/pueue/src/daemon/network/message_handler/stash.rs index a92149fe..d01b4505 100644 --- a/pueue/src/daemon/network/message_handler/stash.rs +++ b/pueue/src/daemon/network/message_handler/stash.rs @@ -10,14 +10,18 @@ use crate::daemon::network::response_helper::*; pub fn stash(state: &SharedState, task_ids: Vec) -> Message { let mut state = state.lock().unwrap(); let filtered_tasks = state.filter_tasks( - |task| matches!(task.status, TaskStatus::Queued | TaskStatus::Locked), + |task| { + matches!( + task.status, + TaskStatus::Queued { .. } | TaskStatus::Locked { .. } + ) + }, Some(task_ids), ); for task_id in &filtered_tasks.matching_ids { if let Some(ref mut task) = state.tasks.get_mut(task_id) { task.status = TaskStatus::Stashed { enqueue_at: None }; - task.enqueued_at = None; } } diff --git a/pueue/src/daemon/network/message_handler/switch.rs b/pueue/src/daemon/network/message_handler/switch.rs index c0180160..0dd70296 100644 --- a/pueue/src/daemon/network/message_handler/switch.rs +++ b/pueue/src/daemon/network/message_handler/switch.rs @@ -16,7 +16,12 @@ pub fn switch(settings: &Settings, state: &SharedState, message: SwitchMessage) let task_ids = [message.task_id_1, message.task_id_2]; let filtered_tasks = state.filter_tasks( - |task| matches!(task.status, TaskStatus::Queued | TaskStatus::Stashed { .. }), + |task| { + matches!( + task.status, + TaskStatus::Queued { .. } | TaskStatus::Stashed { .. } + ) + }, Some(task_ids.to_vec()), ); if !filtered_tasks.non_matching_ids.is_empty() { @@ -80,27 +85,27 @@ mod tests { { let mut state = state.lock().unwrap(); - let task = get_stub_task("0", TaskStatus::Queued); + let task = get_stub_task("0", StubStatus::Queued); state.add_task(task); - let task = get_stub_task("1", TaskStatus::Stashed { enqueue_at: None }); + let task = get_stub_task("1", StubStatus::Stashed { enqueue_at: None }); state.add_task(task); - let task = get_stub_task("2", TaskStatus::Queued); + let task = get_stub_task("2", StubStatus::Queued); state.add_task(task); - let task = get_stub_task("3", TaskStatus::Stashed { enqueue_at: None }); + let task = get_stub_task("3", StubStatus::Stashed { enqueue_at: None }); state.add_task(task); - let mut task = get_stub_task("4", TaskStatus::Queued); + let mut task = get_stub_task("4", StubStatus::Queued); task.dependencies = vec![0, 3]; state.add_task(task); - let mut task = get_stub_task("5", TaskStatus::Stashed { enqueue_at: None }); + let mut task = get_stub_task("5", StubStatus::Stashed { enqueue_at: None }); task.dependencies = vec![1]; state.add_task(task); - let mut task = get_stub_task("6", TaskStatus::Queued); + let mut task = get_stub_task("6", StubStatus::Queued); task.dependencies = vec![2, 3]; state.add_task(task); } diff --git a/pueue/src/daemon/process_handler/finish.rs b/pueue/src/daemon/process_handler/finish.rs index e0ccee56..a124f355 100644 --- a/pueue/src/daemon/process_handler/finish.rs +++ b/pueue/src/daemon/process_handler/finish.rs @@ -23,6 +23,20 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) { } for ((task_id, group, worker_id), error) in finished.iter() { + let (enqueued_at, start) = { + let task = state.tasks.get(task_id).unwrap(); + // Get the enqueued_at/start times from the current state. + match task.status { + TaskStatus::Running { enqueued_at, start } + | TaskStatus::Paused { enqueued_at, start } => (enqueued_at, start), + _ => { + error!("Discovered a finished task in unexpected state! Please report this."); + error!("Task {task_id}: {task:#?}"); + (Local::now(), Local::now()) + } + } + }; + // Handle std::io errors on child processes. // I have never seen something like this, but it might happen. if let Some(error) = error { @@ -34,11 +48,16 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) { .remove(worker_id) .expect("Errored child went missing while handling finished task."); - // Update the tasks's state and return a clone for callbacks and notifications. + // Update the tasks's state and return a clone for callback handling. let task = { let task = state.tasks.get_mut(task_id).unwrap(); - task.status = TaskStatus::Done(TaskResult::Errored); - task.end = Some(Local::now()); + + task.status = TaskStatus::Done { + enqueued_at, + start, + end: Local::now(), + result: TaskResult::Errored, + }; task.clone() }; @@ -78,15 +97,19 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) { None => TaskResult::Killed, }; - // Update the tasks's state and return a clone for callbacks and notifications. + // Update the tasks's state and return a clone for callback handling. let task = { let task = state .tasks .get_mut(task_id) .expect("Task was removed before child process has finished!"); - task.status = TaskStatus::Done(result.clone()); - task.end = Some(Local::now()); + task.status = TaskStatus::Done { + enqueued_at, + start, + end: Local::now(), + result: result.clone(), + }; task.clone() }; diff --git a/pueue/src/daemon/process_handler/kill.rs b/pueue/src/daemon/process_handler/kill.rs index 3e7b602d..d8bdec85 100644 --- a/pueue/src/daemon/process_handler/kill.rs +++ b/pueue/src/daemon/process_handler/kill.rs @@ -47,7 +47,12 @@ pub fn kill( // Determine all running or paused tasks in that group. let filtered_tasks = state.filter_tasks_of_group( - |task| matches!(task.status, TaskStatus::Running | TaskStatus::Paused), + |task| { + matches!( + task.status, + TaskStatus::Running { .. } | TaskStatus::Paused { .. } + ) + }, &group_name, ); diff --git a/pueue/src/daemon/process_handler/pause.rs b/pueue/src/daemon/process_handler/pause.rs index f171053d..1487b66c 100644 --- a/pueue/src/daemon/process_handler/pause.rs +++ b/pueue/src/daemon/process_handler/pause.rs @@ -30,7 +30,7 @@ pub fn pause(settings: &Settings, state: &mut LockedState, selection: TaskSelect info!("Pausing group {group_name}"); let filtered_tasks = state.filter_tasks_of_group( - |task| matches!(task.status, TaskStatus::Running), + |task| matches!(task.status, TaskStatus::Running { .. }), &group_name, ); @@ -48,22 +48,26 @@ pub fn pause(settings: &Settings, state: &mut LockedState, selection: TaskSelect // Pause all tasks that were found. if !wait { for id in keys { - pause_task(state, id); - } - } + // Get the enqueued_at/start times from the current state. + let (enqueued_at, start) = match state.tasks.get(&id).unwrap().status { + TaskStatus::Running { enqueued_at, start } + | TaskStatus::Paused { enqueued_at, start } => (enqueued_at, start), + _ => continue, + }; - ok_or_shutdown!(settings, state, save_state(state, settings)); -} + let success = match perform_action(state, id, ProcessAction::Pause) { + Err(err) => { + error!("Failed pausing task {id}: {err:?}"); + false + } + Ok(success) => success, + }; -/// Pause a specific task. -/// Send a signal to the process to actually pause the OS process. -fn pause_task(state: &mut LockedState, id: usize) { - match perform_action(state, id, ProcessAction::Pause) { - Err(err) => error!("Failed pausing task {id}: {err:?}"), - Ok(success) => { if success { - state.change_status(id, TaskStatus::Paused); + state.change_status(id, TaskStatus::Paused { enqueued_at, start }); } } } + + ok_or_shutdown!(settings, state, save_state(state, settings)); } diff --git a/pueue/src/daemon/process_handler/spawn.rs b/pueue/src/daemon/process_handler/spawn.rs index b6671cf1..d0b1a857 100644 --- a/pueue/src/daemon/process_handler/spawn.rs +++ b/pueue/src/daemon/process_handler/spawn.rs @@ -3,7 +3,7 @@ use std::process::Stdio; use chrono::Local; use command_group::CommandGroup; -use log::{error, info}; +use log::{error, info, warn}; use pueue_lib::log::{create_log_file_handles, get_writable_log_file_handle}; use pueue_lib::process_helper::compile_shell_command; use pueue_lib::settings::Settings; @@ -38,7 +38,7 @@ pub fn get_next_task_id(state: &LockedState) -> Option { let mut potential_tasks: Vec<&Task> = state .tasks .iter() - .filter(|(_, task)| task.status == TaskStatus::Queued) + .filter(|(_, task)| matches!(task.status, TaskStatus::Queued {..})) .filter(|(_, task)| { // Make sure the task is assigned to an existing group. let group = match state.groups.get(&task.group) { @@ -84,7 +84,7 @@ pub fn get_next_task_id(state: &LockedState) -> Option { task.dependencies .iter() .flat_map(|id| state.tasks.get(id)) - .all(|task| matches!(task.status, TaskStatus::Done(TaskResult::Success))) + .all(|task| matches!(task.status, TaskStatus::Done{result: TaskResult::Success, ..})) }) .map(|(_, task)| {task}) .collect(); @@ -111,20 +111,22 @@ pub fn get_next_task_id(state: &LockedState) -> Option { /// The output of subprocesses is piped into a separate file for easier access pub fn spawn_process(settings: &Settings, state: &mut LockedState, task_id: usize) { // Check if the task exists and can actually be spawned. Otherwise do an early return. - match state.tasks.get(&task_id) { - Some(task) => { - if !matches!( - &task.status, - TaskStatus::Stashed { .. } | TaskStatus::Queued | TaskStatus::Paused - ) { - info!("Tried to start task with status: {}", task.status); - return; - } - } - None => { - info!("Tried to start non-existing task: {task_id}"); + let Some(task) = state.tasks.get(&task_id) else { + warn!("Tried to start non-existing task: {task_id}"); + return; + }; + + // Get the task's enqueue time and make sure we don't have invalid states for spawning. + let enqueued_at = match &task.status { + TaskStatus::Stashed { .. } + | TaskStatus::Paused { .. } + | TaskStatus::Running { .. } + | TaskStatus::Done { .. } => { + warn!("Tried to start task with status: {}", task.status); return; } + TaskStatus::Queued { enqueued_at } => *enqueued_at, + TaskStatus::Locked { .. } => Local::now(), }; let pueue_directory = settings.shared.pueue_directory(); @@ -189,9 +191,12 @@ pub fn spawn_process(settings: &Settings, state: &mut LockedState, task_id: usiz // Update all necessary fields on the task. let task = { let task = state.tasks.get_mut(&task_id).unwrap(); - task.status = TaskStatus::Done(TaskResult::FailedToSpawn(error)); - task.start = Some(Local::now()); - task.end = Some(Local::now()); + task.status = TaskStatus::Done { + enqueued_at, + start: Local::now(), + end: Local::now(), + result: TaskResult::FailedToSpawn(error), + }; task.clone() }; @@ -208,8 +213,10 @@ pub fn spawn_process(settings: &Settings, state: &mut LockedState, task_id: usiz state.children.add_child(&group, worker_id, task_id, child); let task = state.tasks.get_mut(&task_id).unwrap(); - task.start = Some(Local::now()); - task.status = TaskStatus::Running; + task.status = TaskStatus::Running { + enqueued_at, + start: Local::now(), + }; // Overwrite the task's environment variables with the new ones, containing the // PUEUE_WORKER_ID and PUEUE_GROUP variables. task.envs = envs; diff --git a/pueue/src/daemon/process_handler/start.rs b/pueue/src/daemon/process_handler/start.rs index 9d6a34b6..e9a2f0ef 100644 --- a/pueue/src/daemon/process_handler/start.rs +++ b/pueue/src/daemon/process_handler/start.rs @@ -46,7 +46,7 @@ pub fn start(settings: &Settings, state: &mut LockedState, tasks: TaskSelection) info!("Resuming group {}", &group_name); let filtered_tasks = state.filter_tasks_of_group( - |task| matches!(task.status, TaskStatus::Paused), + |task| matches!(task.status, TaskStatus::Paused { .. }), &group_name, ); @@ -76,10 +76,26 @@ fn continue_task(state: &mut LockedState, task_id: usize) { return; } - // Task is already done - if state.tasks.get(&task_id).unwrap().is_done() { - return; - } + // Encapsulate to prevent a duplicate borrow on `state`. + let (enqueued_at, start) = { + // Task is already done + let Some(task) = state.tasks.get_mut(&task_id) else { + return; + }; + + // Return early if the task is somehow already done and not cleaned up yet. + if task.is_done() { + warn!("Tried to resume finished task: {:?}", task); + return; + } + + // Don't send a resume signal if the task isn't paused. + let TaskStatus::Paused { enqueued_at, start } = task.status else { + warn!("Tried to resume unpaused task: {:?}", task); + return; + }; + (enqueued_at, start) + }; let success = match perform_action(state, task_id, ProcessAction::Resume) { Err(err) => { @@ -90,6 +106,6 @@ fn continue_task(state: &mut LockedState, task_id: usize) { }; if success { - state.change_status(task_id, TaskStatus::Running); + state.change_status(task_id, TaskStatus::Running { enqueued_at, start }); } } diff --git a/pueue/src/daemon/state_helper.rs b/pueue/src/daemon/state_helper.rs index c06ec46f..e37e132a 100644 --- a/pueue/src/daemon/state_helper.rs +++ b/pueue/src/daemon/state_helper.rs @@ -26,7 +26,7 @@ pub fn is_task_removable(state: &LockedState, task_id: &usize, to_delete: &[usiz .tasks .iter() .filter(|(_, task)| { - task.dependencies.contains(task_id) && !matches!(task.status, TaskStatus::Done(_)) + task.dependencies.contains(task_id) && !matches!(task.status, TaskStatus::Done { .. }) }) .map(|(_, task)| task.id) .collect(); @@ -150,18 +150,25 @@ pub fn restore_state(pueue_directory: &Path) -> Result> { // While restoring the tasks, check for any invalid/broken stati. for (_, task) in state.tasks.iter_mut() { // Handle ungraceful shutdowns while executing tasks. - if task.status == TaskStatus::Running || task.status == TaskStatus::Paused { + if let TaskStatus::Running { start, enqueued_at } + | TaskStatus::Paused { start, enqueued_at } = task.status + { info!( "Setting task {} with previous status {:?} to new status {:?}", task.id, task.status, TaskResult::Killed ); - task.status = TaskStatus::Done(TaskResult::Killed); + task.status = TaskStatus::Done { + start, + end: Local::now(), + enqueued_at, + result: TaskResult::Killed, + }; } // Handle crash during editing of the task command. - if task.status == TaskStatus::Locked { + if matches!(task.status, TaskStatus::Locked { .. }) { task.status = TaskStatus::Stashed { enqueue_at: None }; } @@ -183,7 +190,7 @@ pub fn restore_state(pueue_directory: &Path) -> Result> { // If there are any queued tasks, pause the group. // This should prevent any unwanted execution of tasks due to a system crash. - if task.status == TaskStatus::Queued { + if let TaskStatus::Queued { .. } = task.status { info!( "Pausing group {} to prevent unwanted execution of previous tasks", &task.group diff --git a/pueue/src/daemon/task_handler.rs b/pueue/src/daemon/task_handler.rs index 0ce7119d..939186f6 100644 --- a/pueue/src/daemon/task_handler.rs +++ b/pueue/src/daemon/task_handler.rs @@ -139,8 +139,9 @@ fn enqueue_delayed_tasks(settings: &Settings, state: &mut LockedState) { if time <= Local::now() { info!("Enqueuing delayed task : {}", task.id); - task.status = TaskStatus::Queued; - task.enqueued_at = Some(Local::now()); + task.status = TaskStatus::Queued { + enqueued_at: Local::now(), + }; changed = true; } } @@ -158,7 +159,9 @@ fn check_failed_dependencies(settings: &Settings, state: &mut LockedState) { let has_failed_deps: Vec<_> = state .tasks .iter() - .filter(|(_, task)| task.status == TaskStatus::Queued && !task.dependencies.is_empty()) + .filter(|(_, task)| { + matches!(task.status, TaskStatus::Queued { .. }) && !task.dependencies.is_empty() + }) .filter_map(|(id, task)| { // At this point we got all queued tasks with dependencies. // Go through all dependencies and ensure they didn't fail. @@ -197,9 +200,17 @@ fn check_failed_dependencies(settings: &Settings, state: &mut LockedState) { // Update the task and return a clone to build the callback. let task = { let task = state.tasks.get_mut(&id).unwrap(); - task.status = TaskStatus::Done(TaskResult::DependencyFailed); - task.start = Some(Local::now()); - task.end = Some(Local::now()); + // We know that this must be true, but we have to check anyway. + let TaskStatus::Queued { enqueued_at } = task.status else { + continue; + }; + + task.status = TaskStatus::Done { + enqueued_at, + start: Local::now(), + end: Local::now(), + result: TaskResult::DependencyFailed, + }; task.clone() }; diff --git a/pueue/tests/client/helper/compare_output.rs b/pueue/tests/client/helper/compare_output.rs index 4c232fc2..8a092560 100644 --- a/pueue/tests/client/helper/compare_output.rs +++ b/pueue/tests/client/helper/compare_output.rs @@ -31,7 +31,9 @@ pub async fn get_task_context(settings: &Settings) -> Result Result Result<()> { let state = get_state(shared).await?; let task = state.tasks.get(&0).unwrap(); assert_eq!(task.command, "sleep 60"); - assert_eq!(task.status, TaskStatus::Running); + assert_matches!( + task.status, + TaskStatus::Running { .. }, + "Task should be running" + ); Ok(()) } @@ -98,10 +104,14 @@ async fn restart_and_edit_task_path_and_command() -> Result<()> { assert_eq!(task.label, Some("replaced string".to_owned())); // Also the task should have been restarted and failed. - if let TaskStatus::Done(TaskResult::FailedToSpawn(_)) = task.status { - } else { - bail!("The task should have failed"); - }; + assert_matches!( + task.status, + TaskStatus::Done { + result: TaskResult::FailedToSpawn(_), + .. + }, + "The task should have failed" + ); Ok(()) } @@ -145,10 +155,6 @@ async fn normal_restart_with_edit() -> Result<()> { // Create a task and wait for it to finish. assert_success(add_task(shared, "ls").await?); let original_task = wait_for_task_condition(shared, 0, |task| task.is_done()).await?; - assert!( - original_task.enqueued_at.is_some(), - "Task is done and should have enqueue_at set." - ); // Set the editor to a command which replaces the temporary file's content. let mut envs = HashMap::new(); @@ -162,18 +168,17 @@ async fn normal_restart_with_edit() -> Result<()> { let state = get_state(shared).await?; let task = state.tasks.get(&1).unwrap(); assert_eq!(task.command, "sleep 60"); - assert_eq!(task.status, TaskStatus::Running); + assert_matches!( + task.status, + TaskStatus::Running { .. }, + "Task should be running" + ); // Since we created a copy, the new task should be created after the first one. assert!( original_task.created_at < task.created_at, "New task should have a newer created_at." ); - // The created_at time should also be newer. - assert!( - original_task.enqueued_at.unwrap() < task.enqueued_at.unwrap(), - "The second run should be enqueued before the first run." - ); Ok(()) } diff --git a/pueue/tests/client/unit/status_query.rs b/pueue/tests/client/unit/status_query.rs index 189781b6..a617dc15 100644 --- a/pueue/tests/client/unit/status_query.rs +++ b/pueue/tests/client/unit/status_query.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, path::PathBuf}; use anyhow::Result; +use assert_matches::assert_matches; use chrono::{Local, TimeZone}; use pretty_assertions::assert_eq; use rstest::rstest; @@ -16,7 +17,9 @@ pub fn build_task() -> Task { PathBuf::from("/tmp"), HashMap::new(), PUEUE_DEFAULT_GROUP.to_owned(), - TaskStatus::Queued, + TaskStatus::Queued { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + }, Vec::new(), 0, None, @@ -30,18 +33,24 @@ pub fn test_tasks() -> Vec { // Failed task let mut failed = build_task(); failed.id = 0; - failed.status = TaskStatus::Done(TaskResult::Failed(255)); - failed.start = Some(Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap()); - failed.end = Some(Local.with_ymd_and_hms(2022, 1, 10, 10, 5, 0).unwrap()); + failed.status = TaskStatus::Done { + result: TaskResult::Failed(255), + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 10, 10, 5, 0).unwrap(), + end: Local.with_ymd_and_hms(2022, 1, 10, 10, 10, 0).unwrap(), + }; failed.label = Some("label-10-0".to_string()); tasks.insert(failed.id, failed); // Successful task let mut successful = build_task(); successful.id = 1; - successful.status = TaskStatus::Done(TaskResult::Success); - successful.start = Some(Local.with_ymd_and_hms(2022, 1, 8, 10, 0, 0).unwrap()); - successful.end = Some(Local.with_ymd_and_hms(2022, 1, 8, 10, 5, 0).unwrap()); + successful.status = TaskStatus::Done { + result: TaskResult::Success, + enqueued_at: Local.with_ymd_and_hms(2022, 1, 8, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 8, 10, 5, 0).unwrap(), + end: Local.with_ymd_and_hms(2022, 1, 8, 10, 10, 0).unwrap(), + }; successful.label = Some("label-10-1".to_string()); tasks.insert(successful.id, successful); @@ -63,9 +72,11 @@ pub fn test_tasks() -> Vec { // Running task let mut running = build_task(); - running.status = TaskStatus::Running; + running.status = TaskStatus::Running { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 2, 12, 0, 0).unwrap(), + }; running.id = 4; - running.start = Some(Local.with_ymd_and_hms(2022, 1, 2, 12, 0, 0).unwrap()); tasks.insert(running.id, running); // Add two queued tasks @@ -128,29 +139,6 @@ async fn limit_last() -> Result<()> { Ok(()) } -/// Order the test state by task status. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn order_by_status() -> Result<()> { - let tasks = test_tasks_with_query("order_by status", &None)?; - - let expected = vec![ - TaskStatus::Stashed { enqueue_at: None }, - TaskStatus::Stashed { - enqueue_at: Some(Local.with_ymd_and_hms(2022, 1, 10, 11, 0, 0).unwrap()), - }, - TaskStatus::Queued, - TaskStatus::Queued, - TaskStatus::Running, - TaskStatus::Done(TaskResult::Failed(255)), - TaskStatus::Done(TaskResult::Success), - ]; - - let actual: Vec = tasks.iter().map(|task| task.status.clone()).collect(); - assert_eq!(actual, expected); - - Ok(()) -} - /// Filter by start date #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn filter_start() -> Result<()> { @@ -189,43 +177,110 @@ async fn filter_end_with_time(#[case] format: &'static str) -> Result<()> { /// Filter tasks by status #[rstest] -#[case(TaskStatus::Queued, 2)] -#[case(TaskStatus::Running, 1)] -#[case(TaskStatus::Paused, 0)] -#[case(TaskStatus::Done(TaskResult::Success), 1)] -#[case(TaskStatus::Done(TaskResult::Failed(255)), 1)] +#[case("queued", 2)] +#[case("running", 1)] +#[case("paused", 0)] +#[case("success", 1)] +#[case("failed", 1)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn filter_status(#[case] status: TaskStatus, #[case] match_count: usize) -> Result<()> { +async fn filter_status(#[case] status_filter: &str, #[case] match_count: usize) -> Result<()> { // Get the correct query keyword for the given status. - let status_filter = match status { - TaskStatus::Queued => "queued", - TaskStatus::Stashed { .. } => "stashed", - TaskStatus::Running => "running", - TaskStatus::Paused => "paused", - TaskStatus::Done(TaskResult::Success) => "success", - TaskStatus::Done(TaskResult::Failed(_)) => "failed", - _ => anyhow::bail!("Got unexpected TaskStatus in filter_status"), - }; let tasks = test_tasks_with_query(&format!("status={status_filter}"), &None)?; for task in tasks.iter() { - let id = task.id; - assert_eq!( - task.status, status, - "Expected a different task status on task {id} based on filter {status:?}" - ); + match status_filter { + "queued" => { + assert_matches!( + task.status, + TaskStatus::Queued { .. }, + "Only Queued tasks are allowed" + ); + } + "stashed" => assert_matches!( + task.status, + TaskStatus::Stashed { .. }, + "Only Stashed tasks are allowed" + ), + "running" => assert_matches!( + task.status, + TaskStatus::Running { .. }, + "Only Running tasks are allowed" + ), + "paused" => assert_matches!( + task.status, + TaskStatus::Paused { .. }, + "Only Paused tasks are allowed" + ), + "success" => assert_matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + }, + "Only Succesful tasks are allowed" + ), + "failed" => assert_matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Failed(_), + .. + }, + "Only Failed tasks are allowed" + ), + _ => anyhow::bail!("Got unexpected TaskStatus in filter_status"), + }; } assert_eq!( tasks.len(), match_count, - "Got a different amount of tasks than expected for the status filter {status:?}." + "Got a different amount of tasks than expected for the status filter {status_filter:?}." ); Ok(()) } +/// Order the test state by task status. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn order_by_status() -> Result<()> { + let tasks = test_tasks_with_query("order_by status", &None)?; + + let expected = vec![ + TaskStatus::Stashed { enqueue_at: None }, + TaskStatus::Stashed { + enqueue_at: Some(Local.with_ymd_and_hms(2022, 1, 10, 11, 0, 0).unwrap()), + }, + TaskStatus::Queued { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + }, + TaskStatus::Queued { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + }, + TaskStatus::Running { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 2, 12, 0, 0).unwrap(), + }, + TaskStatus::Done { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 10, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 10, 10, 5, 0).unwrap(), + end: Local.with_ymd_and_hms(2022, 1, 10, 10, 10, 0).unwrap(), + result: TaskResult::Failed(255), + }, + TaskStatus::Done { + enqueued_at: Local.with_ymd_and_hms(2022, 1, 8, 10, 0, 0).unwrap(), + start: Local.with_ymd_and_hms(2022, 1, 8, 10, 5, 0).unwrap(), + end: Local.with_ymd_and_hms(2022, 1, 8, 10, 10, 0).unwrap(), + result: TaskResult::Success, + }, + ]; + + let actual: Vec = tasks.iter().map(|task| task.status.clone()).collect(); + assert_eq!(actual, expected); + + Ok(()) +} + /// Filter tasks by label with the "contains" `%=` filter. #[rstest] #[case("%=", "label", 3)] diff --git a/pueue/tests/daemon/data/v2.0.0_state.json b/pueue/tests/daemon/data/v2.0.0_state.json deleted file mode 100644 index 90da7500..00000000 --- a/pueue/tests/daemon/data/v2.0.0_state.json +++ /dev/null @@ -1,121 +0,0 @@ -{ - "settings": { - "client": { - "restart_in_place": true, - "read_local_logs": true, - "show_confirmation_questions": false, - "show_expanded_aliases": false, - "dark_mode": false, - "max_status_lines": 10, - "status_time_format": "%H:%M:%S", - "status_datetime_format": "%Y-%m-%d %H:%M:%S" - }, - "daemon": { - "pause_group_on_failure": false, - "pause_all_on_failure": false, - "callback": "notify-send \"Task {{ id }}\nCommand: {{ command }}\nPath: {{ path }}\nFinished with status '{{ result }}'\nTook: $(bc <<< \"{{end}} - {{start}}\") seconds\"", - "callback_log_lines": 10 - }, - "shared": { - "pueue_directory": null, - "runtime_directory": null, - "use_unix_socket": true, - "unix_socket_path": null, - "host": "127.0.0.1", - "port": "6924", - "daemon_cert": null, - "daemon_key": null, - "shared_secret_path": null - }, - "profiles": {}, - "herp": [] - }, - "tasks": { - "0": { - "id": 0, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": {}, - "group": "default", - "dependencies": [], - "label": null, - "status": { - "Done": "Success" - }, - "prev_status": "Queued", - "start": "2022-05-09T18:41:29.273563806+02:00", - "end": "2022-05-09T18:41:29.473998692+02:00" - }, - "1": { - "id": 1, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": { - "PUEUE_WORKER_ID": "0", - "PUEUE_GROUP": "test" - }, - "group": "test", - "dependencies": [], - "label": null, - "status": { - "Done": "Success" - }, - "prev_status": "Queued", - "start": "2022-05-09T18:43:30.683677276+02:00", - "end": "2022-05-09T18:43:30.884243263+02:00" - }, - "2": { - "id": 2, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": { - "PUEUE_WORKER_ID": "0", - "PUEUE_GROUP": "test" - }, - "group": "test", - "dependencies": [], - "label": null, - "status": "Queued", - "prev_status": "Queued", - "start": null, - "end": null - }, - "3": { - "id": 3, - "original_command": "ls stash_it", - "command": "ls stash_it", - "path": "/home/nuke/.local/share/pueue", - "envs": {}, - "group": "default", - "dependencies": [], - "label": null, - "status": { - "Stashed": { - "enqueue_at": null - } - }, - "prev_status": { - "Stashed": { - "enqueue_at": null - } - }, - "start": null, - "end": null - } - }, - "groups": { - "default": { - "status": "Running", - "parallel_tasks": 1 - }, - "test": { - "status": "Paused", - "parallel_tasks": 2 - } - }, - "config_path": null, - "derp": null -} diff --git a/pueue/tests/daemon/data/v4.0.0_state.json b/pueue/tests/daemon/data/v4.0.0_state.json new file mode 100644 index 00000000..6a07edbf --- /dev/null +++ b/pueue/tests/daemon/data/v4.0.0_state.json @@ -0,0 +1,125 @@ +{ + "tasks": { + "0": { + "id": 0, + "created_at": "2024-07-18T14:55:02.656354132+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Done": { + "enqueued_at": "2024-07-18T14:55:02.656334442+02:00", + "start": "2024-07-18T14:55:02.890447203+02:00", + "end": "2024-07-18T14:55:03.191922436+02:00", + "result": "Success" + } + } + }, + "1": { + "id": 1, + "created_at": "2024-07-18T14:55:09.965451882+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Done": { + "enqueued_at": "2024-07-18T14:55:09.965430002+02:00", + "start": "2024-07-18T14:55:10.116492792+02:00", + "end": "2024-07-18T14:55:10.418112074+02:00", + "result": { + "Failed": 2 + } + } + } + }, + "2": { + "id": 2, + "created_at": "2024-07-18T14:55:16.045036378+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Stashed": { + "enqueue_at": null + } + } + }, + "3": { + "id": 3, + "created_at": "2024-07-18T14:55:23.253045880+02:00", + "original_command": "sleep 9000000", + "command": "sleep 9000000", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Running": { + "enqueued_at": "2024-07-18T14:55:23.253039780+02:00", + "start": "2024-07-18T14:55:23.365190551+02:00" + } + } + }, + "4": { + "id": 4, + "created_at": "2024-07-18T14:55:52.901140728+02:00", + "original_command": "pueue sleep 60", + "command": "pueue sleep 60", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Queued": { + "enqueued_at": "2024-07-18T14:55:52.901136338+02:00" + } + } + }, + "5": { + "id": 5, + "created_at": "2024-07-18T14:56:15.733118787+02:00", + "original_command": "days ls", + "command": "days ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Queued": { + "enqueued_at": "2024-07-18T14:56:35.928788860+02:00" + } + } + } + }, + "groups": { + "default": { + "status": "Running", + "parallel_tasks": 1 + }, + "downloads": { + "status": "Running", + "parallel_tasks": 1 + } + } +} diff --git a/pueue/tests/daemon/integration/add.rs b/pueue/tests/daemon/integration/add.rs index d1b3f7b8..7ff5c027 100644 --- a/pueue/tests/daemon/integration/add.rs +++ b/pueue/tests/daemon/integration/add.rs @@ -1,6 +1,7 @@ use anyhow::Result; - +use assert_matches::assert_matches; use chrono::Local; + use pueue_lib::network::message::TaskSelection; use pueue_lib::task::*; @@ -27,16 +28,14 @@ async fn test_normal_add() -> Result<()> { task.created_at > pre_addition_time && task.created_at < post_addition_time, "Make sure the created_at time is set correctly" ); - assert!( - task.enqueued_at.unwrap() > pre_addition_time - && task.enqueued_at.unwrap() < post_addition_time, - "Make sure the enqueue_at time is set correctly" - ); - // The task finished successfully - assert_eq!( + assert_matches!( get_task_status(shared, 0).await?, - TaskStatus::Done(TaskResult::Success) + TaskStatus::Done { + result: TaskResult::Success, + .. + }, + "Task should finish successfully", ); Ok(()) @@ -54,16 +53,11 @@ async fn test_stashed_add() -> Result<()> { assert_success(send_message(shared, message).await?); // Make sure the task is actually stashed. - let task = wait_for_task_condition(shared, 0, |task| { + wait_for_task_condition(shared, 0, |task| { matches!(task.status, TaskStatus::Stashed { .. }) }) .await?; - assert!( - task.enqueued_at.is_none(), - "An unqueued task shouldn't have enqueue_at set." - ); - Ok(()) } diff --git a/pueue/tests/daemon/integration/aliases.rs b/pueue/tests/daemon/integration/aliases.rs index ecb0a613..97a6b2ad 100644 --- a/pueue/tests/daemon/integration/aliases.rs +++ b/pueue/tests/daemon/integration/aliases.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use anyhow::Result; +use assert_matches::assert_matches; use pueue_lib::network::message::*; use pueue_lib::task::*; @@ -26,7 +27,16 @@ async fn test_add_with_alias() -> Result<()> { let task = get_task(shared, 0).await?; // The task finished successfully and its command has replaced the alias. - assert_eq!(task.status, TaskStatus::Done(TaskResult::Success)); + assert!( + matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + }, + ), + "Task should have finished successfully" + ); assert_eq!(task.command, "echo test"); assert_eq!(task.original_command, "non_existing_cmd test"); @@ -50,7 +60,14 @@ async fn test_restart_with_alias() -> Result<()> { // Ensure the command hasn't been mutated and the task failed. assert_eq!(task.command, "non_existing_cmd test"); - assert_eq!(task.status, TaskStatus::Done(TaskResult::Failed(127))); + assert_matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Failed(127), + .. + }, + "Task should have failed to start" + ); // Create the alias file which will replace the new command with "echo". let mut aliases = HashMap::new(); @@ -76,7 +93,14 @@ async fn test_restart_with_alias() -> Result<()> { // The task finished successfully and its command has replaced the alias. assert_eq!(task.original_command, "replaced_cmd test"); assert_eq!(task.command, "echo test"); - assert_eq!(task.status, TaskStatus::Done(TaskResult::Success)); + assert_matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Success, + .. + }, + "Task should have finished successfully" + ); // Make sure we see an actual "test" in the output. // This ensures that we really called "echo". diff --git a/pueue/tests/daemon/integration/edit.rs b/pueue/tests/daemon/integration/edit.rs index 8d1f2d32..186333fc 100644 --- a/pueue/tests/daemon/integration/edit.rs +++ b/pueue/tests/daemon/integration/edit.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use anyhow::{bail, Result}; +use assert_matches::assert_matches; use test_log::test; use pueue_lib::network::message::*; @@ -15,7 +16,11 @@ async fn create_edited_task(shared: &Shared) -> Result { assert_success(add_task(shared, "ls").await?); // The task should now be queued - assert_eq!(get_task_status(shared, 0).await?, TaskStatus::Queued); + assert_matches!( + get_task_status(shared, 0).await?, + TaskStatus::Queued { .. }, + "Task should be queued" + ); // Send a request to edit that task let response = send_message(shared, Message::EditRequest(0)).await?; @@ -43,11 +48,19 @@ async fn test_edit_flow() -> Result<()> { assert_eq!(response.priority, 0); // Task should be locked, after the request for editing succeeded. - assert_eq!(get_task_status(shared, 0).await?, TaskStatus::Locked); + assert_matches!( + get_task_status(shared, 0).await?, + TaskStatus::Locked { .. }, + "Expected the task to be locked after first request." + ); // You cannot start a locked task. It should still be locked afterwards. start_tasks(shared, TaskSelection::TaskIds(vec![0])).await?; - assert_eq!(get_task_status(shared, 0).await?, TaskStatus::Locked); + assert_matches!( + get_task_status(shared, 0).await?, + TaskStatus::Locked { .. }, + "Expected the task to still be locked." + ); // Send the final message of the protocol and actually change the task. let response = send_message( @@ -69,7 +82,11 @@ async fn test_edit_flow() -> Result<()> { assert_eq!(task.command, "ls -ahl"); assert_eq!(task.path, PathBuf::from("/tmp")); assert_eq!(task.label, Some("test".to_string())); - assert_eq!(task.status, TaskStatus::Queued); + assert_matches!( + task.status, + TaskStatus::Queued { .. }, + "Task should be queued" + ); assert_eq!(task.priority, 99); Ok(()) diff --git a/pueue/tests/daemon/integration/kill.rs b/pueue/tests/daemon/integration/kill.rs index 67d5786b..78d6620a 100644 --- a/pueue/tests/daemon/integration/kill.rs +++ b/pueue/tests/daemon/integration/kill.rs @@ -65,7 +65,13 @@ async fn test_kill_tasks_with_pause( // Make sure all tasks get killed for id in 0..3 { wait_for_task_condition(shared, id, |task| { - matches!(task.status, TaskStatus::Done(TaskResult::Killed)) + matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Killed, + .. + } + ) }) .await?; } @@ -132,7 +138,13 @@ async fn test_kill_tasks_without_pause(#[case] kill_message: KillMessage) -> Res // Make sure all tasks get killed for id in 0..3 { wait_for_task_condition(shared, id, |task| { - matches!(task.status, TaskStatus::Done(TaskResult::Killed)) + matches!( + task.status, + TaskStatus::Done { + result: TaskResult::Killed, + .. + } + ) }) .await?; } diff --git a/pueue/tests/daemon/integration/parallel_tasks.rs b/pueue/tests/daemon/integration/parallel_tasks.rs index 68e53100..2e1d5b42 100644 --- a/pueue/tests/daemon/integration/parallel_tasks.rs +++ b/pueue/tests/daemon/integration/parallel_tasks.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use pretty_assertions::assert_eq; +use assert_matches::assert_matches; use pueue_lib::{network::message::ParallelMessage, task::*}; @@ -31,7 +31,11 @@ async fn test_parallel_tasks() -> Result<()> { let state = get_state(shared).await?; for task_id in 3..5 { let task = state.tasks.get(&task_id).unwrap(); - assert_eq!(task.status, TaskStatus::Queued); + assert_matches!( + task.status, + TaskStatus::Queued { .. }, + "Task {task_id} should be queued" + ); } // ---- Second group ---- @@ -52,7 +56,11 @@ async fn test_parallel_tasks() -> Result<()> { let state = get_state(shared).await?; for task_id in 7..10 { let task = state.tasks.get(&task_id).unwrap(); - assert_eq!(task.status, TaskStatus::Queued); + assert_matches!( + task.status, + TaskStatus::Queued { .. }, + "Task {task_id} should be queued in second check" + ); } Ok(()) } diff --git a/pueue/tests/daemon/integration/pause.rs b/pueue/tests/daemon/integration/pause.rs index 95a8aeae..18e88c49 100644 --- a/pueue/tests/daemon/integration/pause.rs +++ b/pueue/tests/daemon/integration/pause.rs @@ -1,4 +1,6 @@ use anyhow::{Context, Result}; +use assert_matches::assert_matches; + use pueue_lib::network::message::*; use pueue_lib::state::GroupStatus; use pueue_lib::task::*; @@ -21,7 +23,11 @@ async fn test_pause_daemon() -> Result<()> { sleep_ms(500).await; // Make sure it's not started - assert_eq!(get_task_status(shared, 0).await?, TaskStatus::Queued); + assert_matches!( + get_task_status(shared, 0).await?, + TaskStatus::Queued { .. }, + "Task should not be started yet." + ); Ok(()) } @@ -40,7 +46,10 @@ async fn test_pause_running_task() -> Result<()> { pause_tasks(shared, TaskSelection::All).await?; // Make sure the task as well as the default group get paused - wait_for_task_condition(shared, 0, |task| matches!(task.status, TaskStatus::Paused)).await?; + wait_for_task_condition(shared, 0, |task| { + matches!(task.status, TaskStatus::Paused { .. }) + }) + .await?; let state = get_state(shared).await?; assert_eq!( state.groups.get(PUEUE_DEFAULT_GROUP).unwrap().status, @@ -72,7 +81,11 @@ async fn test_pause_with_wait() -> Result<()> { // Make sure the default group gets paused, but the task is still running wait_for_group_status(shared, PUEUE_DEFAULT_GROUP, GroupStatus::Paused).await?; let state = get_state(shared).await?; - assert_eq!(state.tasks.get(&0).unwrap().status, TaskStatus::Running); + assert_matches!( + state.tasks.get(&0).unwrap().status, + TaskStatus::Running { .. }, + "Task should continue running after group is paused." + ); Ok(()) } diff --git a/pueue/tests/daemon/integration/restart.rs b/pueue/tests/daemon/integration/restart.rs index b6cad310..c08743cd 100644 --- a/pueue/tests/daemon/integration/restart.rs +++ b/pueue/tests/daemon/integration/restart.rs @@ -17,10 +17,6 @@ async fn test_restart_in_place() -> Result<()> { // Wait for task 0 to finish. let original_task = wait_for_task_condition(shared, 0, |task| task.is_done()).await?; - assert!( - original_task.enqueued_at.is_some(), - "Task is done and should have enqueue_at set." - ); // Restart task 0 with an extended sleep command with a different path. let restart_message = RestartMessage { @@ -48,11 +44,6 @@ async fn test_restart_in_place() -> Result<()> { original_task.created_at, task.created_at, "created_at shouldn't change on 'restart -i'" ); - // The created_at time should have been updated - assert!( - original_task.enqueued_at.unwrap() < task.enqueued_at.unwrap(), - "The second run should be enqueued before the first run." - ); // Make sure both command and path were changed let state = get_state(shared).await?; diff --git a/pueue/tests/daemon/integration/spawn.rs b/pueue/tests/daemon/integration/spawn.rs index 19166f8d..2a85b556 100644 --- a/pueue/tests/daemon/integration/spawn.rs +++ b/pueue/tests/daemon/integration/spawn.rs @@ -31,7 +31,10 @@ async fn test_fail_to_spawn_task() -> Result<()> { let task = wait_for_task_condition(shared, 0, |task| task.failed()).await?; assert!(matches!( task.status, - TaskStatus::Done(TaskResult::FailedToSpawn(_)) + TaskStatus::Done { + result: TaskResult::FailedToSpawn(_), + .. + } )); // Get the log output and ensure that there's the expected error log from the daemon. diff --git a/pueue/tests/daemon/integration/start.rs b/pueue/tests/daemon/integration/start.rs index 78b79b48..9fc8dd29 100644 --- a/pueue/tests/daemon/integration/start.rs +++ b/pueue/tests/daemon/integration/start.rs @@ -52,8 +52,10 @@ async fn test_start_tasks(#[case] start_message: StartMessage) -> Result<()> { // Pause the whole daemon and wait until all tasks are paused pause_tasks(shared, TaskSelection::All).await?; for id in 0..3 { - wait_for_task_condition(shared, id, |task| matches!(task.status, TaskStatus::Paused)) - .await?; + wait_for_task_condition(shared, id, |task| { + matches!(task.status, TaskStatus::Paused { .. }) + }) + .await?; } // Send the kill message diff --git a/pueue/tests/daemon/integration/stashed.rs b/pueue/tests/daemon/integration/stashed.rs index 6b9d2078..abf85d97 100644 --- a/pueue/tests/daemon/integration/stashed.rs +++ b/pueue/tests/daemon/integration/stashed.rs @@ -45,11 +45,6 @@ async fn test_enqueued_tasks( // The task should be added in stashed state. let task = wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?; - assert!( - task.enqueued_at.is_none(), - "Enqueued tasks shouldn't have an enqeued_at date set." - ); - // Assert the correct point in time has been set, in case `enqueue_at` is specific. if enqueue_at.is_some() { let status = get_task_status(shared, 0).await?; @@ -60,8 +55,6 @@ async fn test_enqueued_tasks( } } - let pre_enqueue_time = Local::now(); - // Manually enqueue the task let enqueue_message = EnqueueMessage { task_ids: vec![0], @@ -72,12 +65,7 @@ async fn test_enqueued_tasks( .context("Failed to to add task message")?; // Make sure the task is started after being enqueued - let task = wait_for_task_condition(shared, 0, |task| task.is_running()).await?; - - assert!( - task.enqueued_at.unwrap() > pre_enqueue_time, - "Enqueued tasks should have an enqeued_at time set." - ); + wait_for_task_condition(shared, 0, |task| task.is_running()).await?; Ok(()) } @@ -128,10 +116,6 @@ async fn test_stash_queued_task() -> Result<()> { let task = get_task(shared, 0).await?; assert_eq!(task.status, TaskStatus::Stashed { enqueue_at: None }); - assert!( - task.enqueued_at.is_none(), - "Enqueued tasks shouldn't have an enqeued_at date set." - ); Ok(()) } diff --git a/pueue/tests/daemon/state_backward_compatibility.rs b/pueue/tests/daemon/state_backward_compatibility.rs index 971c418d..3d522fd1 100644 --- a/pueue/tests/daemon/state_backward_compatibility.rs +++ b/pueue/tests/daemon/state_backward_compatibility.rs @@ -7,9 +7,9 @@ use tempfile::TempDir; use pueue_lib::settings::Settings; -/// 2.0.0 introduced some breaking changes. +/// 4.0.0 introduced numerous breaking changes. /// From here on, we now aim to once again have full backward compatibility. -/// For this reason, an old v2.0.0 serialized state has been checked in. +/// For this reason, an old v4.0.0 serialized state has been checked in. /// /// We have to be able to restore from that state at all costs. /// Everything else results in a breaking change and needs a major version change. @@ -19,12 +19,12 @@ use pueue_lib::settings::Settings; #[test] fn test_restore_from_old_state() -> Result<()> { better_panic::install(); - let old_state = include_str!("data/v2.0.0_state.json"); + let old_state = include_str!("data/v4.0.0_state.json"); let temp_dir = TempDir::new()?; let temp_path = temp_dir.path(); - // Open v0.12.2 file and write old state to it. + // Open new file and write old state to it. let temp_state_path = temp_dir.path().join("state.json"); let mut file = File::create(temp_state_path)?; file.write_all(old_state.as_bytes())?; diff --git a/pueue/tests/helper/asserts.rs b/pueue/tests/helper/asserts.rs index 5b83ae4f..f05a638e 100644 --- a/pueue/tests/helper/asserts.rs +++ b/pueue/tests/helper/asserts.rs @@ -1,4 +1,5 @@ use anyhow::{bail, Result}; +use assert_matches::assert_matches; use pueue_lib::network::message::*; use pueue_lib::settings::Shared; @@ -8,16 +9,18 @@ use super::send_message; /// Assert that a message is a successful message. pub fn assert_success(message: Message) { - assert!( - matches!(message, Message::Success(_)), + assert_matches!( + message, + Message::Success(_), "Expected to get SuccessMessage, got {message:?}", ); } /// Assert that a message is a failure message. pub fn assert_failure(message: Message) { - assert!( - matches!(message, Message::Failure(_)), + assert_matches!( + message, + Message::Failure(_), "Expected to get FailureMessage, got {message:?}", ); } diff --git a/pueue_lib/src/network/message.rs b/pueue_lib/src/network/message.rs index 9e497799..eb828839 100644 --- a/pueue_lib/src/network/message.rs +++ b/pueue_lib/src/network/message.rs @@ -296,10 +296,8 @@ impl_into_message!(ResetMessage, Message::Reset); #[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)] pub struct CleanMessage { - #[serde(default = "bool::default")] pub successful_only: bool, - #[serde(default = "Option::default")] pub group: Option, } @@ -342,7 +340,6 @@ impl_into_message!(LogRequestMessage, Message::Log); #[derive(PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct TaskLogMessage { pub task: Task, - #[serde(default = "bool::default")] /// Indicates whether the log output has been truncated or not. pub output_complete: bool, pub output: Option>, diff --git a/pueue_lib/src/task.rs b/pueue_lib/src/task.rs index 33dcbeb1..6a43bd6b 100644 --- a/pueue_lib/src/task.rs +++ b/pueue_lib/src/task.rs @@ -10,18 +10,29 @@ use crate::state::PUEUE_DEFAULT_GROUP; /// They basically represent the internal task life-cycle. #[derive(PartialEq, Eq, Clone, Debug, Display, Serialize, Deserialize)] pub enum TaskStatus { - /// The task is queued and waiting for a free slot - Queued, + /// Used while the command of a task is edited (to prevent starting the task) + Locked { previous_status: Box }, /// The task has been manually stashed. It won't be executed until it's manually enqueued Stashed { enqueue_at: Option> }, + /// The task is queued and waiting for a free slot + Queued { enqueued_at: DateTime }, /// The task is started and running - Running, + Running { + enqueued_at: DateTime, + start: DateTime, + }, /// A previously running task has been paused - Paused, + Paused { + enqueued_at: DateTime, + start: DateTime, + }, /// Task finished. The actual result of the task is handled by the [TaskResult] enum. - Done(TaskResult), - /// Used while the command of a task is edited (to prevent starting the task) - Locked, + Done { + enqueued_at: DateTime, + start: DateTime, + end: DateTime, + result: TaskResult, + }, } /// This enum represents the exit status of an actually spawned program. @@ -48,28 +59,16 @@ pub enum TaskResult { #[derive(PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct Task { pub id: usize, - #[serde(default = "Local::now")] pub created_at: DateTime, - #[serde(default = "Default::default")] - pub enqueued_at: Option>, pub original_command: String, pub command: String, pub path: PathBuf, pub envs: HashMap, pub group: String, pub dependencies: Vec, - #[serde(default = "Default::default")] pub priority: i32, pub label: Option, pub status: TaskStatus, - /// This field is only used when editing the path/command of a task. - /// It's necessary, since we enter the `Locked` state during editing. - /// However, we have to go back to the previous state after we finished editing. - /// - /// TODO: Refactor this into a `TaskStatus::Locked{previous_status: TaskStatus}`. - pub prev_status: TaskStatus, - pub start: Option>, - pub end: Option>, } impl Task { @@ -87,7 +86,6 @@ impl Task { Task { id: 0, created_at: Local::now(), - enqueued_at: None, original_command: original_command.clone(), command: original_command, path, @@ -97,9 +95,6 @@ impl Task { priority, label, status: starting_status.clone(), - prev_status: starting_status, - start: None, - end: None, } } @@ -108,7 +103,6 @@ impl Task { Task { id: 0, created_at: Local::now(), - enqueued_at: None, original_command: task.original_command.clone(), command: task.command.clone(), path: task.path.clone(), @@ -117,21 +111,32 @@ impl Task { dependencies: Vec::new(), priority: 0, label: task.label.clone(), - status: TaskStatus::Queued, - prev_status: TaskStatus::Queued, - start: None, - end: None, + status: TaskStatus::Queued { + enqueued_at: Local::now(), + }, + } + } + + pub fn start_and_end(&self) -> (Option>, Option>) { + match self.status { + TaskStatus::Running { start, .. } => (Some(start), None), + TaskStatus::Paused { start, .. } => (Some(start), None), + TaskStatus::Done { start, end, .. } => (Some(start), Some(end)), + _ => (None, None), } } /// Whether the task is having a running process managed by the TaskHandler pub fn is_running(&self) -> bool { - matches!(self.status, TaskStatus::Running | TaskStatus::Paused) + matches!( + self.status, + TaskStatus::Running { .. } | TaskStatus::Paused { .. } + ) } /// Whether the task's process finished. pub fn is_done(&self) -> bool { - matches!(self.status, TaskStatus::Done(_)) + matches!(self.status, TaskStatus::Done { .. }) } /// Check if the task errored. \ @@ -140,7 +145,7 @@ impl Task { /// 2. Didn't finish yet. pub fn failed(&self) -> bool { match &self.status { - TaskStatus::Done(result) => !matches!(result, TaskResult::Success), + TaskStatus::Done { result, .. } => !matches!(result, TaskResult::Success), _ => false, } } @@ -154,7 +159,7 @@ impl Task { pub fn is_queued(&self) -> bool { matches!( self.status, - TaskStatus::Queued + TaskStatus::Queued { .. } | TaskStatus::Stashed { enqueue_at: Some(_) } @@ -188,9 +193,6 @@ impl std::fmt::Debug for Task { .field("dependencies", &self.dependencies) .field("label", &self.label) .field("status", &self.status) - .field("prev_status", &self.prev_status) - .field("start", &self.start) - .field("end", &self.end) .field("priority", &self.priority) .finish() } diff --git a/pueue_lib/tests/data/v0.19.0_state.json b/pueue_lib/tests/data/v0.19.0_state.json deleted file mode 100644 index 5c9c020e..00000000 --- a/pueue_lib/tests/data/v0.19.0_state.json +++ /dev/null @@ -1,119 +0,0 @@ -{ - "settings": { - "client": { - "restart_in_place": true, - "read_local_logs": true, - "show_confirmation_questions": false, - "show_expanded_aliases": false, - "dark_mode": false, - "max_status_lines": 10, - "status_time_format": "%H:%M:%S", - "status_datetime_format": "%Y-%m-%d %H:%M:%S" - }, - "daemon": { - "pause_group_on_failure": false, - "pause_all_on_failure": false, - "callback": "notify-send \"Task {{ id }}\nCommand: {{ command }}\nPath: {{ path }}\nFinished with status '{{ result }}'\nTook: $(bc <<< \"{{end}} - {{start}}\") seconds\"", - "callback_log_lines": 10 - }, - "shared": { - "pueue_directory": null, - "runtime_directory": null, - "use_unix_socket": true, - "unix_socket_path": null, - "host": "127.0.0.1", - "port": "6924", - "daemon_cert": null, - "daemon_key": null, - "shared_secret_path": null - }, - "profiles": {} - }, - "tasks": { - "0": { - "id": 0, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": {}, - "group": "default", - "dependencies": [], - "label": null, - "status": { - "Done": "Success" - }, - "prev_status": "Queued", - "start": "2022-05-09T18:41:29.273563806+02:00", - "end": "2022-05-09T18:41:29.473998692+02:00" - }, - "1": { - "id": 1, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": { - "PUEUE_WORKER_ID": "0", - "PUEUE_GROUP": "test" - }, - "group": "test", - "dependencies": [], - "label": null, - "status": { - "Done": "Success" - }, - "prev_status": "Queued", - "start": "2022-05-09T18:43:30.683677276+02:00", - "end": "2022-05-09T18:43:30.884243263+02:00" - }, - "2": { - "id": 2, - "original_command": "ls", - "command": "ls", - "path": "/home/nuke/.local/share/pueue", - "envs": { - "PUEUE_WORKER_ID": "0", - "PUEUE_GROUP": "test" - }, - "group": "test", - "dependencies": [], - "label": null, - "status": "Queued", - "prev_status": "Queued", - "start": null, - "end": null - }, - "3": { - "id": 3, - "original_command": "ls stash_it", - "command": "ls stash_it", - "path": "/home/nuke/.local/share/pueue", - "envs": {}, - "group": "default", - "dependencies": [], - "label": null, - "status": { - "Stashed": { - "enqueue_at": null - } - }, - "prev_status": { - "Stashed": { - "enqueue_at": null - } - }, - "start": null, - "end": null - } - }, - "groups": { - "default": { - "status": "Running", - "parallel_tasks": 1 - }, - "test": { - "status": "Paused", - "parallel_tasks": 2 - } - }, - "config_path": null -} diff --git a/pueue_lib/tests/data/v4.0.0_state.json b/pueue_lib/tests/data/v4.0.0_state.json new file mode 100644 index 00000000..90342a49 --- /dev/null +++ b/pueue_lib/tests/data/v4.0.0_state.json @@ -0,0 +1,125 @@ +{ + "tasks": { + "0": { + "id": 0, + "created_at": "2024-07-18T14:55:02.656354132+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Done": { + "enqueued_at": "2024-07-18T14:55:02.656334442+02:00", + "start": "2024-07-18T14:55:02.890447203+02:00", + "end": "2024-07-18T14:55:03.191922436+02:00", + "result": "Success" + } + } + }, + "1": { + "id": 1, + "created_at": "2024-07-18T14:55:09.965451882+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Done": { + "enqueued_at": "2024-07-18T14:55:09.965430002+02:00", + "start": "2024-07-18T14:55:10.116492792+02:00", + "end": "2024-07-18T14:55:10.418112074+02:00", + "result": { + "Failed": 2 + } + } + } + }, + "2": { + "id": 2, + "created_at": "2024-07-18T14:55:16.045036378+02:00", + "original_command": "ls", + "command": "ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Stashed": { + "enqueue_at": null + } + } + }, + "3": { + "id": 3, + "created_at": "2024-07-18T14:55:23.253045880+02:00", + "original_command": "sleep 9000000", + "command": "sleep 9000000", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Running": { + "enqueued_at": "2024-07-18T14:55:23.253039780+02:00", + "start": "2024-07-18T14:55:23.365190551+02:00" + } + } + }, + "4": { + "id": 4, + "created_at": "2024-07-18T14:55:52.901140728+02:00", + "original_command": "pueue sleep 60", + "command": "pueue sleep 60", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Queued": { + "enqueued_at": "2024-07-18T14:55:52.901136338+02:00" + } + } + }, + "5": { + "id": 5, + "created_at": "2024-07-18T14:56:15.733118787+02:00", + "original_command": "days ls", + "command": "days ls", + "path": "/home/nuke", + "envs": {}, + "group": "default", + "dependencies": [], + "priority": 0, + "label": null, + "status": { + "Queued": { + "enqueued_at": "2024-07-18T14:56:35.928788860+02:00" + } + } + } + }, + "groups": { + "default": { + "status": "Running", + "parallel_tasks": 1 + }, + "test": { + "status": "Running", + "parallel_tasks": 1 + } + } +} diff --git a/pueue_lib/tests/message_backward_compatibility.rs b/pueue_lib/tests/message_backward_compatibility.rs deleted file mode 100644 index 8da8e01f..00000000 --- a/pueue_lib/tests/message_backward_compatibility.rs +++ /dev/null @@ -1,54 +0,0 @@ -use serde_cbor::de::from_slice; -use serde_cbor::ser::to_vec; -use serde_derive::{Deserialize, Serialize}; - -use pueue_lib::network::message::Message as OriginalMessage; - -/// This is the main message enum. \ -/// Everything that's communicated in Pueue can be serialized as this enum. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Message { - Switch(SwitchMessage), - Clean(CleanMessage), -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct SwitchMessage { - pub task_id_1: usize, - pub task_id_2: usize, - pub some_new_field: usize, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct CleanMessage {} - -/// Make sure we can deserialize old messages as long as we have default values set. -#[test] -fn test_deserialize_old_message() { - let message = Message::Clean(CleanMessage {}); - let payload_bytes = to_vec(&message).unwrap(); - - let message: OriginalMessage = from_slice(&payload_bytes).unwrap(); - if let OriginalMessage::Clean(message) = message { - // The serialized message didn't have the `successful_only` property yet. - // Instead the default `false` should be used. - assert!(!message.successful_only); - } else { - panic!("It must be a clean message"); - } -} - -/// Make sure we can deserialize new messages, even if new values exist. -#[test] -fn test_deserialize_new_message() { - let message = Message::Switch(SwitchMessage { - task_id_1: 0, - task_id_2: 1, - some_new_field: 2, - }); - let payload_bytes = to_vec(&message).unwrap(); - - let message: OriginalMessage = from_slice(&payload_bytes).unwrap(); - // The serialized message did have an additional field. The deserialization works anyway. - assert!(matches!(message, OriginalMessage::Switch(_))); -} diff --git a/pueue_lib/tests/state_backward_compatibility.rs b/pueue_lib/tests/state_backward_compatibility.rs index 61454309..dd3aee0f 100644 --- a/pueue_lib/tests/state_backward_compatibility.rs +++ b/pueue_lib/tests/state_backward_compatibility.rs @@ -4,8 +4,8 @@ use anyhow::{Context, Result}; use pueue_lib::state::{GroupStatus, State, PUEUE_DEFAULT_GROUP}; -/// From 0.18.0 on, we aim to have full backward compatibility for our state deserialization. -/// For this reason, an old (slightly modified) v0.18.0 serialized state has been checked in. +/// We aim to have full backward compatibility for our state deserialization for as long as +/// possible. For this reason, an old v4.0.0 serialized state has been checked in. /// /// **Warning**: This is only one part of our state tests. /// There is another full test suite in the `pueue` project, which deals with domain @@ -14,17 +14,14 @@ use pueue_lib::state::{GroupStatus, State, PUEUE_DEFAULT_GROUP}; /// /// We have to be able to restore from that state at all costs. /// Everything else results in a breaking change and needs a major version change. -/// (For `pueue_lib` as well as `pueue`! -/// -/// On top of simply having an old state, I also removed a few default fields. -/// This should be handled as well. +/// (For `pueue_lib` as well as `pueue`!) #[test] fn test_restore_from_old_state() -> Result<()> { better_panic::install(); let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("tests") .join("data") - .join("v0.19.0_state.json"); + .join("v4.0.0_state.json"); // Try to load the file. let data = fs::read_to_string(path).context("State restore: Failed to read file")?; @@ -46,11 +43,11 @@ fn test_restore_from_old_state() -> Result<()> { ); assert_eq!( state.groups.get("test").unwrap().status, - GroupStatus::Paused + GroupStatus::Running ); assert!(state.tasks.contains_key(&3), "Task 3 should exist"); - assert_eq!(state.tasks.get(&3).unwrap().command, "ls stash_it"); + assert_eq!(state.tasks.get(&3).unwrap().command, "sleep 9000000"); Ok(()) }