Skip to content

Commit

Permalink
Add an atomic flag to TaskletRunner
Browse files Browse the repository at this point in the history
This flag is shared between the threads, if any thread fails, this is set and before the new threads get dispatched or run, this flag is checked.
In case it is set, the program aborts.
  • Loading branch information
lisajulia committed Jul 8, 2024
1 parent 6110b46 commit 80e2945
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions opm/models/parallel/tasklets.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#ifndef EWOMS_TASKLETS_HH
#define EWOMS_TASKLETS_HH

#include <atomic>
#include <stdexcept>
#include <cassert>
#include <thread>
Expand Down Expand Up @@ -223,6 +224,11 @@ public:
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (failureFlag_.load(std::memory_order_relaxed)) {
std::cerr << "Failure flag of the TaskletRunner is set. Not dispatching new tasklets.\n";
exit(EXIT_FAILURE);
}

if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
while (tasklet->referenceCount() > 0) {
Expand All @@ -232,9 +238,11 @@ public:
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
Expand Down Expand Up @@ -280,6 +288,12 @@ public:

barrierTasklet->wait();
}
private:
// Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails.
// This flag is checked before new tasklets run or get dispatched and in case it is true, the
// the thread execution will be stopped / no new tasklets will be started and the program will
// abort.
std::atomic<bool> failureFlag_ = false;

protected:
// main function of the worker thread
Expand All @@ -295,6 +309,12 @@ protected:
void run_()
{
while (true) {
// Check if failure flag is set
if (failureFlag_.load(std::memory_order_relaxed)) {
std::cerr << "Failure flag of the TaskletRunner is set. Exiting thread.\n";
exit(EXIT_FAILURE);
}

// wait until tasklets have been pushed to the queue. first we need to lock
// mutex for access to taskletQueue_
std::unique_lock<std::mutex> lock(taskletQueueMutex_);
Expand Down Expand Up @@ -330,10 +350,12 @@ protected:
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception when running tasklet. Trying to continue.\n";
std::cerr << "ERROR: Uncaught exception when running tasklet.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
Expand Down

0 comments on commit 80e2945

Please sign in to comment.