From 8076bd2d630a63ac7ebfa7896741eea21d5d58d7 Mon Sep 17 00:00:00 2001 From: qipingluo Date: Tue, 23 Jul 2024 20:10:36 +0800 Subject: [PATCH] optimize code structure --- rust/src/push_consumer.rs | 101 +++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 34 deletions(-) diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs index 974d94f5..3ccac5cd 100644 --- a/rust/src/push_consumer.rs +++ b/rust/src/push_consumer.rs @@ -858,7 +858,7 @@ impl AckEntryProcessor { self.ack_entry_sender = Some(ack_entry_sender); let mut ack_entry_queue: VecDeque = VecDeque::new(); let mut ack_ticker = tokio::time::interval(Duration::from_millis(100)); - let mut shadow_processor = self.shadow_self(); + let mut processor = self.shadow_self(); let shutdown_token = CancellationToken::new(); self.shutdown_token = Some(shutdown_token.clone()); let task_tracker = TaskTracker::new(); @@ -867,45 +867,18 @@ impl AckEntryProcessor { loop { select! { _ = ack_ticker.tick() => { - if let Some(ack_entry_item) = ack_entry_queue.front_mut() { - let result = match ack_entry_item { - AckEntryItem::Ack(ack_entry) => { - shadow_processor.ack_message_inner(&ack_entry.message).await - } - AckEntryItem::Nack(nack_entry) => { - shadow_processor.change_invisible_duration_inner(&nack_entry.message, nack_entry.invisible_duration).await - } - AckEntryItem::Dlq(entry) => { - shadow_processor.forward_to_deadletter_queue_inner(&entry.message, entry.delivery_attempt, entry.max_delivery_attempt).await - } - }; - if result.is_ok() { - ack_entry_queue.pop_front(); - } else { - error!(shadow_processor.logger, "ack message failed: {:?}, will deliver later.", result); - ack_entry_item.inc_attempt(); - } + let result = processor.process_ack_entry_queue(&mut ack_entry_queue).await; + if result.is_err() { + error!(processor.logger, "process ack entry queue failed: {:?}", result); } } Some(ack_entry) = ack_entry_receiver.recv() => { ack_entry_queue.push_back(ack_entry); - debug!(shadow_processor.logger, "ack entry queue size: {}", ack_entry_queue.len()); + debug!(processor.logger, "ack entry queue size: {}", ack_entry_queue.len()); } _ = shutdown_token.cancelled() => { - info!(shadow_processor.logger, "need to process remaining {} entries on shutdown.", ack_entry_queue.len()); - for ack_entry_item in ack_entry_queue { - match ack_entry_item { - AckEntryItem::Ack(entry) => { - let _ = shadow_processor.ack_message_inner(&entry.message).await; - } - AckEntryItem::Nack(entry) => { - let _ = shadow_processor.change_invisible_duration_inner(&entry.message, entry.invisible_duration).await; - } - AckEntryItem::Dlq(entry) => { - let _ = shadow_processor.forward_to_deadletter_queue_inner(&entry.message, entry.delivery_attempt, entry.max_delivery_attempt).await; - } - } - } + info!(processor.logger, "need to process remaining {} entries on shutdown.", ack_entry_queue.len()); + processor.flush_ack_entry_queue(&mut ack_entry_queue).await; break; } } @@ -914,6 +887,66 @@ impl AckEntryProcessor { Ok(()) } + async fn process_ack_entry_queue( + &mut self, + ack_entry_queue: &mut VecDeque, + ) -> Result<(), ClientError> { + if let Some(ack_entry_item) = ack_entry_queue.front_mut() { + let result = match ack_entry_item { + AckEntryItem::Ack(ack_entry) => self.ack_message_inner(&ack_entry.message).await, + AckEntryItem::Nack(nack_entry) => { + self.change_invisible_duration_inner( + &nack_entry.message, + nack_entry.invisible_duration, + ) + .await + } + AckEntryItem::Dlq(entry) => { + self.forward_to_deadletter_queue_inner( + &entry.message, + entry.delivery_attempt, + entry.max_delivery_attempt, + ) + .await + } + }; + if result.is_ok() { + ack_entry_queue.pop_front(); + } else { + error!( + self.logger, + "ack message failed: {:?}, will deliver later.", result + ); + ack_entry_item.inc_attempt(); + } + } + Ok(()) + } + + async fn flush_ack_entry_queue(&mut self, ack_entry_queue: &mut VecDeque) { + for ack_entry_item in ack_entry_queue { + match ack_entry_item { + AckEntryItem::Ack(entry) => { + let _ = self.ack_message_inner(&entry.message).await; + } + AckEntryItem::Nack(entry) => { + let _ = self + .change_invisible_duration_inner(&entry.message, entry.invisible_duration) + .await; + } + AckEntryItem::Dlq(entry) => { + let _ = self + .forward_to_deadletter_queue_inner( + &entry.message, + entry.delivery_attempt, + entry.max_delivery_attempt, + ) + .await; + } + } + } + } + async fn ack_message_inner(&mut self, ack_entry: &MessageView) -> Result<(), ClientError> { let request = AckMessageRequest { group: Some(self.consumer_group.clone()),