Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll-based interface for DmaFile #446

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 80 additions & 11 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use futures_lite::{Stream, StreamExt};
use nix::sys::statfs::*;
use std::{
cell::Ref,
future::Future,
io,
os::unix::io::{AsRawFd, RawFd},
path::Path,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};

pub(super) type Result<T> = crate::Result<T, ()>;
Expand Down Expand Up @@ -242,16 +245,23 @@ impl DmaFile {
///
/// The position must be aligned to for Direct I/O. In most platforms
/// that means 512 bytes.
pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result<ReadResult> {
///
/// Equals to
/// ```ignore
/// pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result<ReadResult>;
/// ```
pub fn read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAtAligned<'_> {
let source = self.file.reactor.upgrade().unwrap().read_dma(
self.as_raw_fd(),
pos,
size,
self.pollable,
self.file.scheduler.borrow().as_ref(),
);
let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?;
Ok(ReadResult::from_sliced_buffer(source, 0, read_size))
PollDmaReadAtAligned {
source: Some(source),
file: &self.file,
}
}

/// Reads into buffer in buf from a specific position in the file.
Expand All @@ -262,7 +272,12 @@ impl DmaFile {
///
/// If you can guarantee proper alignment, prefer [`Self::read_at_aligned`]
/// instead
pub async fn read_at(&self, pos: u64, size: usize) -> Result<ReadResult> {
///
/// Equals to
/// ```ignore
/// pub async fn read_at(&self, pos: u64, size: usize) -> Result<ReadResult>;
/// ```
pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> {
let eff_pos = self.align_down(pos);
let b = (pos - eff_pos) as usize;

Expand All @@ -274,13 +289,12 @@ impl DmaFile {
self.pollable,
self.file.scheduler.borrow().as_ref(),
);

let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?;
Ok(ReadResult::from_sliced_buffer(
source,
b,
std::cmp::min(read_size, size),
))
PollDmaReadAt {
source: Some(source),
file: &self.file,
begin: b,
size,
}
}

/// Submit many reads and process the results in a stream-like fashion via a
Expand Down Expand Up @@ -424,6 +438,61 @@ impl DmaFile {
}
}

/// Future of [`DmaFile::read_at_aligned`].
#[derive(Debug)]
#[must_use = "future has no effect unless you .await or poll it"]
pub struct PollDmaReadAtAligned<'a> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't figure out a good way to cover these structs into a macro 🤦‍♂️ Both the members and fn poll()'s bodies are different...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put these structs in here for now. We can consider moving them to a separate file if more are added.

source: Option<ScheduledSource>,
file: &'a GlommioFile,
}

impl Future for PollDmaReadAtAligned<'_> {
type Output = Result<ReadResult>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let read_size = self
.source
.as_ref()
.expect("Polling a finished task")
.poll_collect_rw(cx)
.map(|read_size| enhanced_try!(read_size, "Reading", self.file))?;

read_size.map(|size| {
let source = self.get_mut().source.take().unwrap();
Ok(ReadResult::from_sliced_buffer(source, 0, size))
})
}
}

/// Future of [`DmaFile::read_at`].
#[derive(Debug)]
#[must_use = "future has no effect unless you .await or poll it"]
pub struct PollDmaReadAt<'a> {
source: Option<ScheduledSource>,
file: &'a GlommioFile,
begin: usize,
size: usize,
}
impl Future for PollDmaReadAt<'_> {
type Output = Result<ReadResult>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let read_size = self
.source
.as_ref()
.expect("Polling a finished task")
.poll_collect_rw(cx)
.map(|read_size| enhanced_try!(read_size, "Reading", self.file))?;

read_size.map(|read_size| {
let offset = self.begin;
let len = self.size.min(read_size);
let source = self.get_mut().source.take().unwrap();
Ok(ReadResult::from_sliced_buffer(source, offset, len))
})
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
Expand Down
11 changes: 8 additions & 3 deletions glommio/src/io/immutable_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::io::{
DmaStreamWriter,
DmaStreamWriterBuilder,
IoVec,
PollDmaReadAt,
ReadManyResult,
ReadResult,
ScheduledSource,
};
use futures_lite::{future::poll_fn, io::AsyncWrite, Stream};
Expand Down Expand Up @@ -372,8 +372,13 @@ impl ImmutableFile {
/// It is not necessary to respect the `O_DIRECT` alignment of the file, and
/// this API will internally convert the positions and sizes to match,
/// at a cost.
pub async fn read_at(&self, pos: u64, size: usize) -> Result<ReadResult> {
self.stream_builder.file.read_at(pos, size).await
///
/// Equals to
/// ```ignore
/// pub async fn read_at(&self, pos: u64, size: usize) -> Result<ReadResult>;
/// ```
pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> {
self.stream_builder.file.read_at(pos, size)
}

/// Submit many reads and process the results in a stream-like fashion via a
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub use self::{
},
bulk_io::{IoVec, ReadManyResult},
directory::Directory,
dma_file::{CloseResult, DmaFile},
dma_file::{CloseResult, DmaFile, PollDmaReadAt, PollDmaReadAtAligned},
dma_file_stream::{
DmaStreamReader,
DmaStreamReaderBuilder,
Expand Down
11 changes: 10 additions & 1 deletion glommio/src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
os::unix::io::RawFd,
path::PathBuf,
rc::Rc,
task::{Poll, Waker},
task::{Context, Poll, Waker},
time::Duration,
};

Expand Down Expand Up @@ -264,6 +264,15 @@ impl Source {
})
.await
}

pub(crate) fn poll_collect_rw(&self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we can make more use of this, like dma write or other files/resources' operation. Maybe we can track the progress in #445.

if let Some(result) = self.result() {
return Poll::Ready(result);
}

self.add_waiter_many(cx.waker().clone());
Poll::Pending
}
}

impl Drop for Source {
Expand Down