Skip to content

Commit

Permalink
fold error_ and cancelled_ together
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Oct 22, 2024
1 parent 9239737 commit 0e27e24
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2688,7 +2688,6 @@ struct AsyncProducer {
std::mutex mutex_;
std::condition_variable cv_;
uint64_t pending_requests_{0};
bool cancelled_{false};
Status error_{Status::OK()};
};

Expand Down Expand Up @@ -2720,19 +2719,14 @@ struct AsyncProducer {
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->pending_requests_ == 0) {
state_->cv_.wait(lock, [this]() -> bool {
return !state_->error_.ok() || state_->cancelled_ ||
state_->pending_requests_ > 0;
return !state_->error_.ok() || state_->pending_requests_ > 0;
});
}

if (!state_->error_.ok()) {
return state_->error_;
}

if (state_->cancelled_) {
return Status::Cancelled("Consumer cancelled");
}

if (state_->pending_requests_ > 0) {
state_->pending_requests_--;
lock.unlock();
Expand All @@ -2754,7 +2748,7 @@ struct AsyncProducer {
auto* self = reinterpret_cast<State*>(producer->private_data);
{
std::lock_guard<std::mutex> lock(self->mutex_);
if (self->cancelled_) {
if (!self->error_.ok()) {
return;
}
self->pending_requests_ += n;
Expand All @@ -2766,10 +2760,10 @@ struct AsyncProducer {
auto* self = reinterpret_cast<State*>(producer->private_data);
{
std::lock_guard<std::mutex> lock(self->mutex_);
if (self->cancelled_) {
if (!self->error_.ok()) {
return;
}
self->cancelled_ = true;
self->error_ = Status::Cancelled("Consumer requested cancellation");
}
self->cv_.notify_all();
}
Expand Down

0 comments on commit 0e27e24

Please sign in to comment.