Skip to content

Commit

Permalink
Merge pull request #556 from Nukesor/status_refactorings
Browse files Browse the repository at this point in the history
Task Status refactorings
  • Loading branch information
Nukesor authored Jul 25, 2024
2 parents 31878c4 + 1ce4e1f commit 960ed8f
Show file tree
Hide file tree
Showing 52 changed files with 896 additions and 631 deletions.
22 changes: 21 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## \[4.0.0\] - unreleased

This release aims to improve Pueue and to rectify some old design decisions.

### Multi-threaded architecture

Up until recently, Pueue had the subprocesses' (tasks') state live in a dedicated thread.
Client commands that directly affected subprocesses, such as `pueue start --immediate`, were forwarded to that special thread via an `mpsc` channel to be further processed.

Expand All @@ -15,13 +19,29 @@ Commands like `pueue add --immediate install_something && pueue send 0 'y\n'` wo

The new state design fixes this issue, which allows Pueue to do subprocess state manipulation directly inside of the client message handlers, effectively removing any delays.

TLDR: Commands that start/stop/kill/pause tasks only return when the task is actually started/stopped/killed/paused.

### Runtime invariants

Previously, various state related runtime invariants were enforced by convention. For example, a task that is queued should not have a `start` or `enqueued_at` time set.
However, this approach is highly error-prone as it's really hard to always think of everything that needs to be set or cleaned up on every possible state transition.

Luckily, this is an issue that can be fixed in a (rather) elegant way in Rust, using struct enums. That way, those invariants are enforced via the type system during the compile time.
The code is a bit more verbose (~25%), but it prevents a whole category of bugs and while doing this refactoring I actually found at least 2 cases where I forgot to clear a variable.

TLDR: The new task state handling is a bit more verbose, but a lot cleaner and type safe.

### Fixed

- Fixed delay after sending process related commands from client. [#548](https://github.com/Nukesor/pueue/pull/548)

### Change

- **Breaking**: Streamlined `pueue log` parameters to behave the same way was `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509)
- **Breaking**: Refactor internal task state. Some task variables have been moved into the `TaskStatus` enum, which now enforces various invariants during compile time via the type system.
Due to this, several subtle time related inconsistencies (task start/stop/enqueue times) have been fixed. [#556](https://github.com/Nukesor/pueue/pull/556) \
**Important: This completely breaks backwards compatibility, including previous state.**
**Important: The Pueue daemon needs to be restarted and the state will be wiped clean.**
- **Breaking**: Streamlined `pueue log` parameters to behave the same way as `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509)
- **Breaking**: Remove the `--children` commandline flags, that have been deprecated and no longer serve any function since `v3.0.0`.

### Add
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
assert_cmd = "2"
assert_matches = "1"
better-panic = { workspace = true }
# Make it easy to view log output for select tests.
# Set log level for tests with RUST_LOG=<level>, use with failed tests or
Expand Down
15 changes: 12 additions & 3 deletions pueue/src/client/commands/restart.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{bail, Result};

use chrono::Local;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::*;
use pueue_lib::settings::Settings;
Expand Down Expand Up @@ -32,7 +33,9 @@ pub async fn restart(
let new_status = if stashed {
TaskStatus::Stashed { enqueue_at: None }
} else {
TaskStatus::Queued
TaskStatus::Queued {
enqueued_at: Local::now(),
}
};

let state = get_state(stream).await?;
Expand All @@ -50,13 +53,19 @@ pub async fn restart(
state.filter_tasks(done_filter, None)
};

// now pick the failed tasks
// Now pick the failed tasks
let failed = filtered_tasks
.matching_ids
.into_iter()
.filter(|task_id| {
let task = state.tasks.get(task_id).unwrap();
!matches!(task.status, TaskStatus::Done(TaskResult::Success))
!matches!(
task.status,
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
})
.collect();

Expand Down
37 changes: 28 additions & 9 deletions pueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,27 @@ pub async fn wait(
fn reached_target_status(task: &Task, target_status: &WaitTargetStatus) -> bool {
match target_status {
WaitTargetStatus::Queued => {
task.status == TaskStatus::Queued
|| task.status == TaskStatus::Running
|| matches!(task.status, TaskStatus::Done(_))
matches!(
task.status,
TaskStatus::Queued { .. } | TaskStatus::Running { .. } | TaskStatus::Done { .. }
)
}
WaitTargetStatus::Running => {
task.status == TaskStatus::Running || matches!(task.status, TaskStatus::Done(_))
matches!(
task.status,
TaskStatus::Running { .. } | TaskStatus::Done { .. }
)
}
WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done { .. }),
WaitTargetStatus::Success => {
matches!(
task.status,
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
}
WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done(_)),
WaitTargetStatus::Success => matches!(task.status, TaskStatus::Done(TaskResult::Success)),
}
}

Expand Down Expand Up @@ -201,7 +213,7 @@ fn log_status_change(previous_status: TaskStatus, task: &Task, style: &OutputSty
// Check if the task has finished.
// In case it has, show the task's result in human-readable form.
// Color some parts of the output depending on the task's outcome.
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
let text = match result {
TaskResult::Success => {
let status = style.style_text("0", Some(Color::Green), None);
Expand Down Expand Up @@ -246,8 +258,15 @@ fn log_status_change(previous_status: TaskStatus, task: &Task, style: &OutputSty

fn get_color_for_status(task_status: &TaskStatus) -> Color {
match task_status {
TaskStatus::Running | TaskStatus::Done(_) => Color::Green,
TaskStatus::Paused | TaskStatus::Locked => Color::White,
TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => Color::White,
TaskStatus::Running { .. } => Color::Green,
TaskStatus::Done { result, .. } => {
if matches!(result, TaskResult::Success) {
Color::Green
} else {
Color::Red
}
}
_ => Color::White,
}
}
9 changes: 5 additions & 4 deletions pueue/src/client/display/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ pub fn sort_tasks_by_group(tasks: Vec<Task>) -> BTreeMap<String, Vec<Task>> {
/// If the task doesn't have a start and/or end yet, an empty string will be returned
/// for the respective field.
pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String) {
// Get the start time.
let (start, end) = task.start_and_end();

// If the task didn't start yet, just return two empty strings.
let start = match task.start {
let start = match start {
Some(start) => start,
None => return ("".into(), "".into()),
};
Expand All @@ -65,8 +66,8 @@ pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String)
.to_string()
};

// Get finish time, if already set. Otherwise only return the formatted start.
let end = match task.end {
// If the task didn't finish yet, only return the formatted start.
let end = match end {
Some(end) => end,
None => return (formatted_start, "".into()),
};
Expand Down
16 changes: 9 additions & 7 deletions pueue/src/client/display/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn print_logs(
if let Some((_, task_log)) = task_iter.peek() {
if matches!(
&task_log.task.status,
TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused,
TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. }
) {
println!();
}
Expand All @@ -122,7 +122,7 @@ fn print_log(
// We only show logs of finished or running tasks.
if !matches!(
task.status,
TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused
TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. }
) {
return;
}
Expand All @@ -148,9 +148,9 @@ fn print_task_info(task: &Task, style: &OutputStyle) {
);

let (exit_status, color) = match &task.status {
TaskStatus::Paused => ("paused".into(), Color::White),
TaskStatus::Running => ("running".into(), Color::Yellow),
TaskStatus::Done(result) => match result {
TaskStatus::Paused { .. } => ("paused".into(), Color::White),
TaskStatus::Running { .. } => ("running".into(), Color::Yellow),
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => ("completed successfully".into(), Color::Green),
TaskResult::Failed(exit_code) => {
(format!("failed with exit code {exit_code}"), Color::Red)
Expand Down Expand Up @@ -197,14 +197,16 @@ fn print_task_info(task: &Task, style: &OutputStyle) {
]);
}

let (start, end) = task.start_and_end();

// Start and end time
if let Some(start) = task.start {
if let Some(start) = start {
table.add_row(vec![
style.styled_cell("Start:", None, Some(ComfyAttribute::Bold)),
Cell::new(start.to_rfc2822()),
]);
}
if let Some(end) = task.end {
if let Some(end) = end {
table.add_row(vec![
style.styled_cell("End:", None, Some(ComfyAttribute::Bold)),
Cell::new(end.to_rfc2822()),
Expand Down
8 changes: 5 additions & 3 deletions pueue/src/client/display/table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,11 @@ impl<'a> TableBuilder<'a> {
// Determine the human readable task status representation and the respective color.
let status_string = task.status.to_string();
let (status_text, color) = match &task.status {
TaskStatus::Running => (status_string, Color::Green),
TaskStatus::Paused | TaskStatus::Locked => (status_string, Color::White),
TaskStatus::Done(result) => match result {
TaskStatus::Running { .. } => (status_string, Color::Green),
TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => {
(status_string, Color::White)
}
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => (TaskResult::Success.to_string(), Color::Green),
TaskResult::DependencyFailed => {
("Dependency failed".to_string(), Color::Red)
Expand Down
22 changes: 15 additions & 7 deletions pueue/src/client/query/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,15 @@ pub fn datetime(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Resu
enqueue_at
}
Rule::column_start => {
let Some(start) = task.start else {
let (start, _) = task.start_and_end();
let Some(start) = start else {
return false;
};
start
}
Rule::column_end => {
let Some(end) = task.end else {
let (_, end) = task.start_and_end();
let Some(end) = end else {
return false;
};
end
Expand Down Expand Up @@ -311,16 +313,22 @@ pub fn status(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Result
// Build the filter function for the task's status.
let filter_function = Box::new(move |task: &Task| -> bool {
let matches = match operand {
Rule::status_queued => matches!(task.status, TaskStatus::Queued),
Rule::status_queued => matches!(task.status, TaskStatus::Queued { .. }),
Rule::status_stashed => matches!(task.status, TaskStatus::Stashed { .. }),
Rule::status_running => matches!(task.status, TaskStatus::Running),
Rule::status_paused => matches!(task.status, TaskStatus::Paused),
Rule::status_running => matches!(task.status, TaskStatus::Running { .. }),
Rule::status_paused => matches!(task.status, TaskStatus::Paused { .. }),
Rule::status_success => {
matches!(&task.status, TaskStatus::Done(TaskResult::Success))
matches!(
&task.status,
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
}
Rule::status_failed => {
let mut matches = false;
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
if !matches!(result, TaskResult::Success) {
matches = true;
}
Expand Down
22 changes: 15 additions & 7 deletions pueue/src/client/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ impl QueryResult {
fn rank_status(task: &Task) -> u8 {
match &task.status {
TaskStatus::Stashed { .. } => 0,
TaskStatus::Locked => 1,
TaskStatus::Queued => 2,
TaskStatus::Paused => 3,
TaskStatus::Running => 4,
TaskStatus::Done(result) => match result {
TaskStatus::Locked { .. } => 1,
TaskStatus::Queued { .. } => 2,
TaskStatus::Paused { .. } => 3,
TaskStatus::Running { .. } => 4,
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => 6,
_ => 5,
},
Expand All @@ -93,8 +93,16 @@ impl QueryResult {
Rule::column_label => task1.label.cmp(&task2.label),
Rule::column_command => task1.command.cmp(&task2.command),
Rule::column_path => task1.path.cmp(&task2.path),
Rule::column_start => task1.start.cmp(&task2.start),
Rule::column_end => task1.end.cmp(&task2.end),
Rule::column_start => {
let (start1, _) = task1.start_and_end();
let (start2, _) = task2.start_and_end();
start1.cmp(&start2)
}
Rule::column_end => {
let (_, end1) = task1.start_and_end();
let (_, end2) = task2.start_and_end();
end1.cmp(&end2)
}
_ => std::cmp::Ordering::Less,
});

Expand Down
9 changes: 5 additions & 4 deletions pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn build_callback_command(
parameters.insert("group", task.group.clone());

// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
parameters.insert("result", result.to_string());
} else {
parameters.insert("result", "None".into());
Expand All @@ -75,8 +75,9 @@ pub fn build_callback_command(
time.map(|time| time.timestamp().to_string())
.unwrap_or_default()
};
parameters.insert("start", print_time(task.start));
parameters.insert("end", print_time(task.end));
let (start, end) = task.start_and_end();
parameters.insert("start", print_time(start));
parameters.insert("end", print_time(end));

// Read the last lines of the process' output and make it available.
if let Ok(output) = read_last_log_file_lines(
Expand All @@ -95,7 +96,7 @@ pub fn build_callback_command(
parameters.insert("output_path", out_path.display().to_string());

// Get the exit code
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
match result {
TaskResult::Success => parameters.insert("exit_code", "0".into()),
TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()),
Expand Down
9 changes: 4 additions & 5 deletions pueue/src/daemon/network/message_handler/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,19 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -
message.path,
message.envs,
message.group,
TaskStatus::Locked,
TaskStatus::Queued {
enqueued_at: Local::now(),
},
message.dependencies,
message.priority.unwrap_or(0),
message.label,
);

// Set the starting status.
// Handle if the command is to be stashed and/or automatically enqueued later.
if message.stashed || message.enqueue_at.is_some() {
task.status = TaskStatus::Stashed {
enqueue_at: message.enqueue_at,
};
} else {
task.status = TaskStatus::Queued;
task.enqueued_at = Some(Local::now());
}

// Check if there're any aliases that should be applied.
Expand Down
Loading

0 comments on commit 960ed8f

Please sign in to comment.