From 5b3f16dfb0be6e17adb5954e3cceed2afbef31f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20N=C3=BCtzi?= Date: Sat, 16 Mar 2024 23:50:24 +0100 Subject: [PATCH] fix: Implement `flush` function on `Async` --- lib.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/lib.rs b/lib.rs index 2f7fb28..754aad9 100644 --- a/lib.rs +++ b/lib.rs @@ -474,6 +474,21 @@ impl Drain for AsyncCore { ) -> AsyncResult<()> { self.send(AsyncRecord::from(record, logger_values)) } + + /// Flush the drain which waits as long (poll interval `interval`) till + /// the channel to the async worker thread becomes empty. + fn flush(&self, interval: std::time::Duration) -> io::Result<()> { + let sender = match self.get_sender() { + Ok(s) => s, + Err(_) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)), + }; + + while !sender.is_empty() { + thread::sleep(interval); + } + + Ok(()) + } } /// Serialized record. @@ -796,6 +811,11 @@ impl Drain for Async { Ok(()) } + + #[inline] + fn flush(&self, interval: std::time::Duration) -> io::Result<()> { + self.core.flush(interval) + } } impl Drop for Async { @@ -806,25 +826,37 @@ impl Drop for Async { // }}} - #[cfg(test)] mod test { use super::*; - use std::sync::mpsc; + use std::{sync::mpsc, time::Duration}; #[test] fn integration_test() { let (mock_drain, mock_drain_rx) = MockDrain::new(); - let async_drain = AsyncBuilder::new(mock_drain) - .build(); - let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1")); + let async_drain = AsyncBuilder::new(mock_drain).build(); + + let e = async_drain.flush(Duration::from_millis(10)); + assert_eq!(matches!(e, Ok(())), true); + + let slog = + slog::Logger::root(async_drain.fuse(), o!("field1" => "value1")); info!(slog, "Message 1"; "field2" => "value2"); warn!(slog, "Message 2"; "field3" => "value3"); - assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#); - assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#); - } + let e = slog.flush(Duration::from_millis(10)); + assert_eq!(matches!(e, Ok(())), true); + + assert_eq!( + mock_drain_rx.recv().unwrap(), + r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"# + ); + assert_eq!( + mock_drain_rx.recv().unwrap(), + r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"# + ); + } /// Test-helper drain #[derive(Debug)] @@ -843,7 +875,11 @@ mod test { type Ok = (); type Err = slog::Never; - fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result { + fn log( + &self, + record: &Record, + logger_kv: &OwnedKVList, + ) -> Result { let mut serializer = MockSerializer::default(); logger_kv.serialize(record, &mut serializer).unwrap(); record.kv().serialize(record, &mut serializer).unwrap(); @@ -861,7 +897,11 @@ mod test { } impl slog::Serializer for MockSerializer { - fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> { + fn emit_arguments( + &mut self, + key: Key, + val: &fmt::Arguments, + ) -> Result<(), slog::Error> { self.kvs.push((key.to_string(), val.to_string())); Ok(()) }