Skip to content

Commit

Permalink
Do not submit cancelled tasks
Browse files Browse the repository at this point in the history
Fixes #198.
  • Loading branch information
trowski committed Jan 28, 2024
1 parent 5aeaad2 commit c09b678
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/Worker/Internal/ContextWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace Amp\Parallel\Worker\Internal;

use Amp\ByteStream\ReadableBuffer;
use Amp\ByteStream\StreamChannel;
use Amp\ByteStream\WritableBuffer;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredFuture;
Expand Down Expand Up @@ -139,6 +142,10 @@ public function submit(Task $task, ?Cancellation $cancellation = null): Executio
throw new StatusError("The worker has been shut down");
}

if ($cancellation?->isRequested()) {
return self::createCancelledExecution($task, $cancellation);
}

$receive = empty($this->jobQueue);
$submission = new Internal\TaskSubmission($task);
$jobId = $submission->getId();
Expand Down Expand Up @@ -220,4 +227,17 @@ public function kill(): void
$this->exitStatus ??= Future::error(new WorkerException("The worker was killed"));
$this->exitStatus->ignore();
}

private static function createCancelledExecution(Task $task, Cancellation $cancellation): Execution
{
$channel = new StreamChannel(new ReadableBuffer(), new WritableBuffer());
$channel->close();

try {
$cancellation->throwIfRequested();
throw new \Error('Expected cancellation to have been requested');
} catch (CancelledException $exception) {
return new Execution($task, $channel, Future::error($exception));
}
}
}
18 changes: 18 additions & 0 deletions test/Worker/AbstractWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Amp\Parallel\Test\Worker;

use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\Future;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\StatusError;
Expand Down Expand Up @@ -337,6 +339,22 @@ public function testSubmitAfterCancelledTask(): void
$worker->shutdown();
}

public function testCancelBeforeSubmit(): void
{
$this->expectException(CancelledException::class);

$worker = $this->createWorker();

$deferredCancellation = new DeferredCancellation();
$deferredCancellation->cancel();

try {
$worker->submit(new Fixtures\CancellingTask, $deferredCancellation->getCancellation())->await();
} finally {
$worker->shutdown();
}
}

public function testCancellingCompletedTask(): void
{
$worker = $this->createWorker();
Expand Down

0 comments on commit c09b678

Please sign in to comment.