Skip to content

Commit

Permalink
change: Don't pause groups without queued tasks on kill
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Aug 20, 2023
1 parent ee15a8e commit 0b3d188
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 32 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.3.0] - unreleased

### Changed

- QoL improvement: Don't pause groups if there're no queued tasks. [#452](https://github.com/Nukesor/pueue/issues/452)
Auto-pausing of groups was only done to prevent the unwanted execution of other tasks, but this isn't necessary, if there're no queued tasks.

## [3.2.0] - 2023-06-13

### Added
Expand Down
11 changes: 5 additions & 6 deletions pueue/src/client/query/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ pub fn datetime(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Resu
// Get the field we should apply the filter to.
let field = match column {
Rule::column_enqueue_at => {
if let TaskStatus::Stashed {
let TaskStatus::Stashed {
enqueue_at: Some(enqueue_at),
} = task.status
{
enqueue_at
} else {
} = task.status else {
return false;
}
};

enqueue_at
}
Rule::column_start => {
let Some(start) = task.start else {
Expand Down
2 changes: 1 addition & 1 deletion pueue/src/daemon/network/message_handler/edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn edit_request(task_id: usize, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&task_id) {
Some(task) => {
if !task.is_queued() {
if !task.is_queued() && !task.is_stashed() {
return create_failure_message("You can only edit a queued/stashed task");
}
task.prev_status = task.status.clone();
Expand Down
47 changes: 35 additions & 12 deletions pueue/src/daemon/task_handler/messages/kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pueue_lib::process_helper::*;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;

use crate::daemon::state_helper::save_state;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::daemon::task_handler::{Shutdown, TaskHandler};
use crate::ok_or_shutdown;

Expand All @@ -15,24 +15,30 @@ impl TaskHandler {
/// By default, this kills tasks with Rust's subprocess handling "kill" logic.
/// However, the user can decide to send unix signals to the processes as well.
///
/// `pause_groups` If `group` or `all` is given, the groups should be paused under some
/// circumstances. This is mostly to prevent any further task execution during an emergency
/// `issued_by_user` This is `true` when a kill is issued by an actual user.
/// It is `false`, if the daemon resets or during shutdown.
///
/// In case `true` is given and a `group` or `all` are killed the affected groups should
/// be paused under some circumstances. is mostly to prevent any further task execution
/// during an emergency. These circumstances are:
/// - There're further queued or scheduled tasks in a killed group.
///
/// `signal` Don't kill the task as usual, but rather send a unix process signal.
pub fn kill(&mut self, tasks: TaskSelection, pause_groups: bool, signal: Option<Signal>) {
pub fn kill(&mut self, tasks: TaskSelection, issued_by_user: bool, signal: Option<Signal>) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
// Get the keys of all tasks that should be resumed
let task_ids = match tasks {
TaskSelection::TaskIds(task_ids) => task_ids,
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
let group = match state.groups.get_mut(&group_name) {
Some(group) => group,
None => return,
if !state.groups.contains_key(&group_name) {
return;
};

// Pause this specific group.
if pause_groups {
// Check whether the group should be paused before killing the tasks.
if should_pause_group(&state, issued_by_user, &group_name) {
let group = state.groups.get_mut(&group_name).unwrap();
group.status = GroupStatus::Paused;
}

Expand All @@ -46,9 +52,12 @@ impl TaskHandler {
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Pause all running tasks
if pause_groups {
state.set_status_for_all_groups(GroupStatus::Paused);
// Pause all groups, if applicable
let group_names: Vec<String> = state.groups.keys().cloned().collect();
for group_name in group_names {
if should_pause_group(&state, issued_by_user, &group_name) {
state.set_status_for_all_groups(GroupStatus::Paused);
}
}

info!("Killing all running tasks");
Expand Down Expand Up @@ -96,3 +105,17 @@ impl TaskHandler {
}
}
}

/// Determine, whether a group should be paused during a kill command.
/// It should only be paused if:
/// - The kill was issued by the user, i.e. it wasn't issued by a system during shutdown/reset.
/// - The group that's being killed must have queued or stashed-enqueued tasks.
fn should_pause_group(state: &LockedState, issued_by_user: bool, group: &str) -> bool {
if !issued_by_user {
return false;
}

// Check if there're tasks that're queued or enqueued.
let filtered_tasks = state.filter_tasks_of_group(|task| task.is_queued(), group);
!filtered_tasks.matching_ids.is_empty()
}
81 changes: 78 additions & 3 deletions pueue/tests/daemon/integration/kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use crate::helper::*;
/// - Via the --group flag, which just kills everything in the default group.
/// - Via specific ids.
///
/// If a whole group or everything is killed, the respective groups should also be paused!
/// If a whole group or everything is killed, the respective groups should also be paused,
/// as long as there's no further queued task.
/// This is security measure to prevent unwanted task execution in an emergency.
#[rstest]
#[case(
Expand All @@ -37,7 +38,7 @@ use crate::helper::*;
}, false
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_kill_tasks(
async fn test_kill_tasks_with_pause(
#[case] kill_message: KillMessage,
#[case] group_should_pause: bool,
) -> Result<()> {
Expand All @@ -46,13 +47,18 @@ async fn test_kill_tasks(

// Add multiple tasks and start them immediately
for _ in 0..3 {
assert_success(add_task(shared, "sleep 60", true).await?);
assert_success(add_and_start_task(shared, "sleep 60").await?);
}
// Wait until all tasks are running
for id in 0..3 {
wait_for_task_condition(shared, id, |task| task.is_running()).await?;
}

// Add another task that will be normally enqueued.
for _ in 0..3 {
assert_success(add_task(shared, "sleep 60", false).await?);
}

// Send the kill message
send_message(shared, kill_message).await?;

Expand All @@ -75,3 +81,72 @@ async fn test_kill_tasks(

Ok(())
}

/// This test ensures the following rule:
/// If a whole group or everything is killed, the respective groups should not be paused, as long
/// as there's no further queued task in that group.
///
/// We test different ways of killing those tasks.
/// - Via the --all flag, which just kills everything.
/// - Via the --group flag, which just kills everything in the default group.
/// - Via specific ids.
#[rstest]
#[case(
KillMessage {
tasks: TaskSelection::All,
signal: None,
}
)]
#[case(
KillMessage {
tasks: TaskSelection::Group(PUEUE_DEFAULT_GROUP.into()),
signal: None,
}
)]
#[case(
KillMessage {
tasks: TaskSelection::TaskIds(vec![0, 1, 2]),
signal: None,
}
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_kill_tasks_without_pause(#[case] kill_message: KillMessage) -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Add multiple tasks and start them immediately
for _ in 0..3 {
assert_success(add_and_start_task(shared, "sleep 60").await?);
}
// Wait until all tasks are running
for id in 0..3 {
wait_for_task_condition(shared, id, |task| task.is_running()).await?;
}

// Add a dummy group that also shouldn't be paused.
add_group_with_slots(shared, "testgroup", 1).await?;

// Send the kill message
send_message(shared, kill_message).await?;

// Make sure all tasks get killed
for id in 0..3 {
wait_for_task_condition(shared, id, |task| {
matches!(task.status, TaskStatus::Done(TaskResult::Killed))
})
.await?;
}

// Groups should not be paused, since no other queued tasks exist at this point in time.
let state = get_state(shared).await?;
assert_eq!(
state.groups.get(PUEUE_DEFAULT_GROUP).unwrap().status,
GroupStatus::Running
);
assert_eq!(
state.groups.get("testgroup").unwrap().status,
GroupStatus::Running
);

Ok(())
}
12 changes: 3 additions & 9 deletions pueue/tests/daemon/integration/stashed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ async fn test_enqueued_tasks(
assert_success(add_stashed_task(shared, "sleep 10", stashed, enqueue_at).await?);

// The task should be added in stashed state.
let task = wait_for_task_condition(shared, 0, |task| {
matches!(task.status, TaskStatus::Stashed { .. })
})
.await?;
let task = wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?;

assert!(
task.enqueued_at.is_none(),
Expand All @@ -56,7 +53,7 @@ async fn test_enqueued_tasks(
// Assert the correct point in time has been set, in case `enqueue_at` is specific.
if enqueue_at.is_some() {
let status = get_task_status(shared, 0).await?;
assert!(matches!(status, TaskStatus::Stashed { .. }));
assert!(task.is_stashed());

if let TaskStatus::Stashed { enqueue_at: inner } = status {
assert_eq!(inner, enqueue_at);
Expand Down Expand Up @@ -102,10 +99,7 @@ async fn test_delayed_tasks() -> Result<()> {
assert_success(response);

// The task should be added in stashed state for about 1 second.
wait_for_task_condition(shared, 0, |task| {
matches!(task.status, TaskStatus::Stashed { .. })
})
.await?;
wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?;

// Make sure the task is started after being automatically enqueued.
sleep_ms(800).await;
Expand Down
10 changes: 10 additions & 0 deletions pueue/tests/helper/factories/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ pub async fn add_task(shared: &Shared, command: &str, start_immediately: bool) -
.context("Failed to to add task.")
}

/// Adds a task to the test daemon and starts it immediately.
pub async fn add_and_start_task(shared: &Shared, command: &str) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.start_immediately = true;

send_message(shared, message)
.await
.context("Failed to to add task.")
}

/// Adds a task to the test daemon.
pub async fn add_task_with_priority(
shared: &Shared,
Expand Down
14 changes: 13 additions & 1 deletion pueue_lib/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,20 @@ impl Task {
}
}

/// Convenience helper on whether a task is stashed
pub fn is_stashed(&self) -> bool {
matches!(self.status, TaskStatus::Stashed { .. })
}

/// Check whether a task is queued or might soon be enqueued.
pub fn is_queued(&self) -> bool {
matches!(self.status, TaskStatus::Queued | TaskStatus::Stashed { .. })
matches!(
self.status,
TaskStatus::Queued
| TaskStatus::Stashed {
enqueue_at: Some(_)
}
)
}

/// Small convenience function to set the task's group to the default group.
Expand Down

0 comments on commit 0b3d188

Please sign in to comment.