Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Implement flush function on Async #36

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,21 @@ impl Drain for AsyncCore {
) -> AsyncResult<()> {
self.send(AsyncRecord::from(record, logger_values))
}

/// Flush the drain which waits as long (poll interval `interval`) till
/// the channel to the async worker thread becomes empty.
fn flush(&self, interval: std::time::Duration) -> io::Result<()> {
let sender = match self.get_sender() {
Ok(s) => s,
Err(_) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)),
};

while !sender.is_empty() {
thread::sleep(interval);
}

Ok(())
}
}

/// Serialized record.
Expand Down Expand Up @@ -796,6 +811,11 @@ impl Drain for Async {

Ok(())
}

#[inline]
fn flush(&self, interval: std::time::Duration) -> io::Result<()> {
self.core.flush(interval)
}
}

impl Drop for Async {
Expand All @@ -806,25 +826,37 @@ impl Drop for Async {

// }}}


#[cfg(test)]
mod test {
use super::*;
use std::sync::mpsc;
use std::{sync::mpsc, time::Duration};

#[test]
fn integration_test() {
let (mock_drain, mock_drain_rx) = MockDrain::new();
let async_drain = AsyncBuilder::new(mock_drain)
.build();
let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
let async_drain = AsyncBuilder::new(mock_drain).build();

let e = async_drain.flush(Duration::from_millis(10));
assert_eq!(matches!(e, Ok(())), true);

let slog =
slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));

info!(slog, "Message 1"; "field2" => "value2");
warn!(slog, "Message 2"; "field3" => "value3");
assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
}

let e = slog.flush(Duration::from_millis(10));
assert_eq!(matches!(e, Ok(())), true);

assert_eq!(
mock_drain_rx.recv().unwrap(),
r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#
);
assert_eq!(
mock_drain_rx.recv().unwrap(),
r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#
);
}

/// Test-helper drain
#[derive(Debug)]
Expand All @@ -843,7 +875,11 @@ mod test {
type Ok = ();
type Err = slog::Never;

fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
fn log(
&self,
record: &Record,
logger_kv: &OwnedKVList,
) -> Result<Self::Ok, Self::Err> {
let mut serializer = MockSerializer::default();
logger_kv.serialize(record, &mut serializer).unwrap();
record.kv().serialize(record, &mut serializer).unwrap();
Expand All @@ -861,7 +897,11 @@ mod test {
}

impl slog::Serializer for MockSerializer {
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
fn emit_arguments(
&mut self,
key: Key,
val: &fmt::Arguments,
) -> Result<(), slog::Error> {
self.kvs.push((key.to_string(), val.to_string()));
Ok(())
}
Expand Down