hourly cleanup
This commit is contained in:
@@ -66,6 +66,8 @@ async fn main() -> std::io::Result<()> {
|
||||
.max_size(100_000)
|
||||
.to_owned();
|
||||
crate::flow_debug!("video cache initialized max_size=100000");
|
||||
cache.spawn_hourly_cleanup();
|
||||
crate::flow_debug!("video cache hourly cleanup task spawned");
|
||||
let _ = providers::configure_runtime_validation(pool.clone(), cache.clone(), requester.clone());
|
||||
|
||||
thread::spawn(move || {
|
||||
|
||||
@@ -5,6 +5,11 @@ use std::time::Duration;
|
||||
|
||||
use crate::videos::VideoItem;
|
||||
|
||||
/// Entries older than this are considered stale and dropped.
|
||||
const ENTRY_TTL: Duration = Duration::from_secs(60 * 60);
|
||||
/// How often the background task scans the cache for stale entries.
|
||||
const CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct VideoCache {
|
||||
cache: Arc<Mutex<std::collections::HashMap<String, (SystemTime, Vec<VideoItem>)>>>, // url -> time+Items
|
||||
@@ -55,25 +60,59 @@ impl VideoCache {
|
||||
None
|
||||
}
|
||||
|
||||
/// Number of entries currently held.
|
||||
pub fn len(&self) -> usize {
|
||||
self.cache.lock().map(|c| c.len()).unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Drop every entry older than [`ENTRY_TTL`]. Returns how many were removed.
|
||||
pub fn prune_expired(&self) -> usize {
|
||||
let mut removed = 0;
|
||||
if let Ok(mut cache) = self.cache.lock() {
|
||||
cache.retain(|_key, (time, _items)| match time.elapsed() {
|
||||
// Keep entries within the TTL; drop the rest.
|
||||
Ok(elapsed) => {
|
||||
let keep = elapsed <= ENTRY_TTL;
|
||||
if !keep {
|
||||
removed += 1;
|
||||
}
|
||||
keep
|
||||
}
|
||||
// Clock went backwards: keep the entry rather than guessing.
|
||||
Err(_) => true,
|
||||
});
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
/// Spawn a background task that prunes stale entries once per hour for the
|
||||
/// lifetime of the process.
|
||||
pub fn spawn_hourly_cleanup(&self) {
|
||||
let cache = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
|
||||
// The first tick fires immediately; consume it so the cache isn't
|
||||
// scanned the instant the server starts.
|
||||
interval.tick().await;
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let removed = cache.prune_expired();
|
||||
let remaining = cache.len();
|
||||
// Referenced unconditionally so the bindings aren't flagged as
|
||||
// unused when `flow_debug!` compiles to nothing (debug off).
|
||||
let _ = (removed, remaining);
|
||||
crate::flow_debug!(
|
||||
"video cache cleanup removed={} remaining={}",
|
||||
removed,
|
||||
remaining
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn check(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let iter = match self.entries() {
|
||||
Some(iter) => iter,
|
||||
None => {
|
||||
return Err(Box::new(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"Could not get entries",
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
for (key, (time, _items)) in iter {
|
||||
if let Ok(elapsed) = time.elapsed() {
|
||||
if elapsed > Duration::from_secs(60 * 60) {
|
||||
self.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.prune_expired();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user