Skip to content

Commit

Permalink
storage: use global thread pool to support prefetch
Browse files Browse the repository at this point in the history
Currently we use a prefetch manager per backend, this design has two
drawbacks:
- spawn too many worker threads for prefetch
- the network ratelimit has been defunct

So change the design to use a global prefetch manager with thread pool
to supporrt all prefetch requests.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Mar 31, 2023
1 parent fb8db88 commit 5144688
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 225 deletions.
20 changes: 13 additions & 7 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::runtime::Runtime;

use crate::backend::BlobReader;
use crate::cache::state::ChunkMap;
use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, PrefetchMgr};
use crate::cache::{BlobCache, BlobIoMergeState};
use crate::device::{
BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec,
Expand Down Expand Up @@ -134,9 +134,9 @@ pub(crate) struct FileCacheEntry {
pub(crate) meta: Option<FileCacheMeta>,
pub(crate) metrics: Arc<BlobcacheMetrics>,
pub(crate) prefetch_state: Arc<AtomicU32>,
pub(crate) prefetch_mgr: Arc<PrefetchMgr>,
pub(crate) reader: Arc<dyn BlobReader>,
pub(crate) runtime: Arc<Runtime>,
pub(crate) workers: Arc<AsyncWorkerMgr>,

pub(crate) blob_compressed_size: u64,
pub(crate) blob_uncompressed_size: u64,
Expand Down Expand Up @@ -458,7 +458,8 @@ impl BlobCache for FileCacheEntry {
warn!("storage: inaccurate prefetch status");
}
if val == 0 || val == 1 {
self.workers.flush_pending_prefetch_requests(&self.blob_id);
self.prefetch_mgr
.flush_pending_prefetch_requests(&self.blob_id);
return Ok(());
}
}
Expand All @@ -477,11 +478,12 @@ impl BlobCache for FileCacheEntry {
// Handle blob prefetch request first, it may help performance.
for req in prefetches {
let msg = AsyncPrefetchMessage::new_blob_prefetch(
self.prefetch_mgr.clone(),
blob_cache.clone(),
req.offset as u64,
req.len as u64,
);
let _ = self.workers.send_prefetch_message(msg);
let _ = self.prefetch_mgr.send_prefetch_message(msg);
}

// Then handle fs prefetch
Expand All @@ -494,8 +496,12 @@ impl BlobCache for FileCacheEntry {
max_comp_size,
max_comp_size as u64 >> RAFS_BATCH_SIZE_TO_GAP_SHIFT,
|req: BlobIoRange| {
let msg = AsyncPrefetchMessage::new_fs_prefetch(blob_cache.clone(), req);
let _ = self.workers.send_prefetch_message(msg);
let msg = AsyncPrefetchMessage::new_fs_prefetch(
self.prefetch_mgr.clone(),
blob_cache.clone(),
req,
);
let _ = self.prefetch_mgr.send_prefetch_message(msg);
},
);

Expand Down Expand Up @@ -593,7 +599,7 @@ impl BlobCache for FileCacheEntry {

fn read(&self, iovec: &mut BlobIoVec, buffers: &[FileVolatileSlice]) -> Result<usize> {
self.metrics.total.inc();
self.workers.consume_prefetch_budget(iovec.size());
self.prefetch_mgr.consume_prefetch_budget(iovec.size());

if iovec.is_empty() {
Ok(0)
Expand Down
17 changes: 8 additions & 9 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
use crate::cache::state::{
BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap,
};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::worker::{AsyncPrefetchConfig, PrefetchMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::device::{BlobFeatures, BlobInfo};
use crate::RAFS_DEFAULT_CHUNK_SIZE;
Expand All @@ -32,8 +32,8 @@ pub struct FileCacheMgr {
backend: Arc<dyn BlobBackend>,
metrics: Arc<BlobcacheMetrics>,
prefetch_config: Arc<AsyncPrefetchConfig>,
prefetch_mgr: Arc<PrefetchMgr>,
runtime: Arc<Runtime>,
worker_mgr: Arc<AsyncWorkerMgr>,
work_dir: String,
validate: bool,
disable_indexed_map: bool,
Expand All @@ -53,15 +53,15 @@ impl FileCacheMgr {
let work_dir = blob_cfg.get_work_dir()?;
let metrics = BlobcacheMetrics::new(id, work_dir);
let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
let worker_mgr = PrefetchMgr::new(metrics.clone(), prefetch_config.clone())?;

Ok(FileCacheMgr {
blobs: Arc::new(RwLock::new(HashMap::new())),
backend,
metrics,
prefetch_config,
runtime,
worker_mgr: Arc::new(worker_mgr),
prefetch_mgr: Arc::new(worker_mgr),
work_dir: work_dir.to_owned(),
disable_indexed_map: blob_cfg.disable_indexed_map,
validate: config.cache_validate,
Expand All @@ -87,7 +87,7 @@ impl FileCacheMgr {
blob.clone(),
self.prefetch_config.clone(),
self.runtime.clone(),
self.worker_mgr.clone(),
self.prefetch_mgr.clone(),
)?;
let entry = Arc::new(entry);
let mut guard = self.blobs.write().unwrap();
Expand All @@ -108,13 +108,12 @@ impl FileCacheMgr {

impl BlobCacheMgr for FileCacheMgr {
fn init(&self) -> Result<()> {
AsyncWorkerMgr::start(self.worker_mgr.clone())
self.prefetch_mgr.setup()
}

fn destroy(&self) {
if !self.closed.load(Ordering::Acquire) {
self.closed.store(true, Ordering::Release);
self.worker_mgr.stop();
self.backend().shutdown();
self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
}
Expand Down Expand Up @@ -170,7 +169,7 @@ impl FileCacheEntry {
blob_info: Arc<BlobInfo>,
prefetch_config: Arc<AsyncPrefetchConfig>,
runtime: Arc<Runtime>,
workers: Arc<AsyncWorkerMgr>,
prefetch_mgr: Arc<PrefetchMgr>,
) -> Result<Self> {
let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
let is_tarfs = blob_info.features().is_tarfs();
Expand Down Expand Up @@ -296,7 +295,7 @@ impl FileCacheEntry {
prefetch_state: Arc::new(AtomicU32::new(0)),
reader,
runtime,
workers,
prefetch_mgr,

blob_compressed_size,
blob_uncompressed_size,
Expand Down
18 changes: 8 additions & 10 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::runtime::Runtime;
use crate::backend::BlobBackend;
use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::worker::{AsyncPrefetchConfig, PrefetchMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
use crate::factory::BLOB_FACTORY;
Expand All @@ -32,8 +32,8 @@ pub struct FsCacheMgr {
backend: Arc<dyn BlobBackend>,
metrics: Arc<BlobcacheMetrics>,
prefetch_config: Arc<AsyncPrefetchConfig>,
prefetch_mgr: Arc<PrefetchMgr>,
runtime: Arc<Runtime>,
worker_mgr: Arc<AsyncWorkerMgr>,
work_dir: String,
need_validation: bool,
blobs_check_count: Arc<AtomicU8>,
Expand All @@ -56,7 +56,7 @@ impl FsCacheMgr {
let work_dir = blob_cfg.get_work_dir()?;
let metrics = BlobcacheMetrics::new(id, work_dir);
let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
let worker_mgr = PrefetchMgr::new(metrics.clone(), prefetch_config.clone())?;

BLOB_FACTORY.start_mgr_checker();

Expand All @@ -66,7 +66,7 @@ impl FsCacheMgr {
metrics,
prefetch_config,
runtime,
worker_mgr: Arc::new(worker_mgr),
prefetch_mgr: Arc::new(worker_mgr),
work_dir: work_dir.to_owned(),
need_validation: config.cache_validate,
blobs_check_count: Arc::new(AtomicU8::new(0)),
Expand All @@ -91,7 +91,7 @@ impl FsCacheMgr {
blob.clone(),
self.prefetch_config.clone(),
self.runtime.clone(),
self.worker_mgr.clone(),
self.prefetch_mgr.clone(),
)?;
let entry = Arc::new(entry);
let mut guard = self.blobs.write().unwrap();
Expand All @@ -112,13 +112,12 @@ impl FsCacheMgr {

impl BlobCacheMgr for FsCacheMgr {
fn init(&self) -> Result<()> {
AsyncWorkerMgr::start(self.worker_mgr.clone())
self.prefetch_mgr.setup()
}

fn destroy(&self) {
if !self.closed.load(Ordering::Acquire) {
self.closed.store(true, Ordering::Release);
self.worker_mgr.stop();
self.backend().shutdown();
self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
}
Expand Down Expand Up @@ -173,7 +172,6 @@ impl BlobCacheMgr for FsCacheMgr {
// we should double check blobs stat, in case some blobs hadn't been created when we checked.
if all_ready {
if self.blobs_check_count.load(Ordering::Acquire) == FSCACHE_BLOBS_CHECK_NUM {
self.worker_mgr.stop();
self.metrics.data_all_ready.store(true, Ordering::Release);
} else {
self.blobs_check_count.fetch_add(1, Ordering::Acquire);
Expand All @@ -196,7 +194,7 @@ impl FileCacheEntry {
blob_info: Arc<BlobInfo>,
prefetch_config: Arc<AsyncPrefetchConfig>,
runtime: Arc<Runtime>,
workers: Arc<AsyncWorkerMgr>,
prefetch_mgr: Arc<PrefetchMgr>,
) -> Result<Self> {
if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) {
return Err(einval!("fscache does not support Rafs v5 blobs"));
Expand Down Expand Up @@ -269,7 +267,7 @@ impl FileCacheEntry {
prefetch_state: Arc::new(AtomicU32::new(0)),
reader,
runtime,
workers,
prefetch_mgr,

blob_compressed_size,
blob_uncompressed_size: blob_info.uncompressed_size(),
Expand Down
Loading

0 comments on commit 5144688

Please sign in to comment.