Skip to content

Commit

Permalink
optimize code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
glcrazier committed Jul 23, 2024
1 parent cd02617 commit 029f03a
Showing 1 changed file with 67 additions and 34 deletions.
101 changes: 67 additions & 34 deletions rust/src/push_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ impl AckEntryProcessor {
self.ack_entry_sender = Some(ack_entry_sender);
let mut ack_entry_queue: VecDeque<AckEntryItem> = VecDeque::new();
let mut ack_ticker = tokio::time::interval(Duration::from_millis(100));
let mut shadow_processor = self.shadow_self();
let mut processor = self.shadow_self();
let shutdown_token = CancellationToken::new();
self.shutdown_token = Some(shutdown_token.clone());
let task_tracker = TaskTracker::new();
Expand All @@ -867,45 +867,18 @@ impl AckEntryProcessor {
loop {
select! {
_ = ack_ticker.tick() => {
if let Some(ack_entry_item) = ack_entry_queue.front_mut() {
let result = match ack_entry_item {
AckEntryItem::Ack(ack_entry) => {
shadow_processor.ack_message_inner(&ack_entry.message).await
}
AckEntryItem::Nack(nack_entry) => {
shadow_processor.change_invisible_duration_inner(&nack_entry.message, nack_entry.invisible_duration).await
}
AckEntryItem::Dlq(entry) => {
shadow_processor.forward_to_deadletter_queue_inner(&entry.message, entry.delivery_attempt, entry.max_delivery_attempt).await
}
};
if result.is_ok() {
ack_entry_queue.pop_front();
} else {
error!(shadow_processor.logger, "ack message failed: {:?}, will deliver later.", result);
ack_entry_item.inc_attempt();
}
let result = processor.process_ack_entry_queue(&mut ack_entry_queue).await;
if result.is_err() {
error!(processor.logger, "process ack entry queue failed: {:?}", result);
}
}
Some(ack_entry) = ack_entry_receiver.recv() => {
ack_entry_queue.push_back(ack_entry);
debug!(shadow_processor.logger, "ack entry queue size: {}", ack_entry_queue.len());
debug!(processor.logger, "ack entry queue size: {}", ack_entry_queue.len());
}
_ = shutdown_token.cancelled() => {
info!(shadow_processor.logger, "need to process remaining {} entries on shutdown.", ack_entry_queue.len());
for ack_entry_item in ack_entry_queue {
match ack_entry_item {
AckEntryItem::Ack(entry) => {
let _ = shadow_processor.ack_message_inner(&entry.message).await;
}
AckEntryItem::Nack(entry) => {
let _ = shadow_processor.change_invisible_duration_inner(&entry.message, entry.invisible_duration).await;
}
AckEntryItem::Dlq(entry) => {
let _ = shadow_processor.forward_to_deadletter_queue_inner(&entry.message, entry.delivery_attempt, entry.max_delivery_attempt).await;
}
}
}
info!(processor.logger, "need to process remaining {} entries on shutdown.", ack_entry_queue.len());
processor.flush_ack_entry_queue(&mut ack_entry_queue).await;
break;
}
}
Expand All @@ -914,6 +887,66 @@ impl AckEntryProcessor {
Ok(())
}

async fn process_ack_entry_queue(
&mut self,
ack_entry_queue: &mut VecDeque<AckEntryItem>,
) -> Result<(), ClientError> {
if let Some(ack_entry_item) = ack_entry_queue.front_mut() {
let result = match ack_entry_item {
AckEntryItem::Ack(ack_entry) => self.ack_message_inner(&ack_entry.message).await,
AckEntryItem::Nack(nack_entry) => {
self.change_invisible_duration_inner(
&nack_entry.message,
nack_entry.invisible_duration,
)
.await
}
AckEntryItem::Dlq(entry) => {
self.forward_to_deadletter_queue_inner(
&entry.message,
entry.delivery_attempt,
entry.max_delivery_attempt,
)
.await
}
};
if result.is_ok() {
ack_entry_queue.pop_front();
} else {
error!(
self.logger,
"ack message failed: {:?}, will deliver later.", result
);
ack_entry_item.inc_attempt();
}
}
Ok(())
}

async fn flush_ack_entry_queue(&mut self, ack_entry_queue: &mut VecDeque<AckEntryItem>) {
for ack_entry_item in ack_entry_queue {
match ack_entry_item {
AckEntryItem::Ack(entry) => {
let _ = self.ack_message_inner(&entry.message).await;
}
AckEntryItem::Nack(entry) => {
let _ = self
.change_invisible_duration_inner(&entry.message, entry.invisible_duration)
.await;
}
AckEntryItem::Dlq(entry) => {
let _ = self
.forward_to_deadletter_queue_inner(
&entry.message,
entry.delivery_attempt,
entry.max_delivery_attempt,
)
.await;
}
}
}
}

async fn ack_message_inner(&mut self, ack_entry: &MessageView) -> Result<(), ClientError> {
let request = AckMessageRequest {
group: Some(self.consumer_group.clone()),
Expand Down

0 comments on commit 029f03a

Please sign in to comment.