diff --git a/src/main.rs b/src/main.rs index edfd611..a1d66ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,8 @@ async fn main() -> std::io::Result<()> { .max_size(100_000) .to_owned(); crate::flow_debug!("video cache initialized max_size=100000"); + cache.spawn_hourly_cleanup(); + crate::flow_debug!("video cache hourly cleanup task spawned"); let _ = providers::configure_runtime_validation(pool.clone(), cache.clone(), requester.clone()); thread::spawn(move || { diff --git a/src/util/cache.rs b/src/util/cache.rs index 28a9b52..dd69c7c 100644 --- a/src/util/cache.rs +++ b/src/util/cache.rs @@ -5,6 +5,11 @@ use std::time::Duration; use crate::videos::VideoItem; +/// Entries older than this are considered stale and dropped. +const ENTRY_TTL: Duration = Duration::from_secs(60 * 60); +/// How often the background task scans the cache for stale entries. +const CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60); + #[derive(Clone)] pub struct VideoCache { cache: Arc)>>>, // url -> time+Items @@ -55,25 +60,59 @@ impl VideoCache { None } + /// Number of entries currently held. + pub fn len(&self) -> usize { + self.cache.lock().map(|c| c.len()).unwrap_or(0) + } + + /// Drop every entry older than [`ENTRY_TTL`]. Returns how many were removed. + pub fn prune_expired(&self) -> usize { + let mut removed = 0; + if let Ok(mut cache) = self.cache.lock() { + cache.retain(|_key, (time, _items)| match time.elapsed() { + // Keep entries within the TTL; drop the rest. + Ok(elapsed) => { + let keep = elapsed <= ENTRY_TTL; + if !keep { + removed += 1; + } + keep + } + // Clock went backwards: keep the entry rather than guessing. + Err(_) => true, + }); + } + removed + } + + /// Spawn a background task that prunes stale entries once per hour for the + /// lifetime of the process. + pub fn spawn_hourly_cleanup(&self) { + let cache = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(CLEANUP_INTERVAL); + // The first tick fires immediately; consume it so the cache isn't + // scanned the instant the server starts. + interval.tick().await; + loop { + interval.tick().await; + let removed = cache.prune_expired(); + let remaining = cache.len(); + // Referenced unconditionally so the bindings aren't flagged as + // unused when `flow_debug!` compiles to nothing (debug off). + let _ = (removed, remaining); + crate::flow_debug!( + "video cache cleanup removed={} remaining={}", + removed, + remaining + ); + } + }); + } + #[allow(dead_code)] pub async fn check(&self) -> Result<(), Box> { - let iter = match self.entries() { - Some(iter) => iter, - None => { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "Could not get entries", - ))); - } - }; - - for (key, (time, _items)) in iter { - if let Ok(elapsed) = time.elapsed() { - if elapsed > Duration::from_secs(60 * 60) { - self.remove(&key); - } - } - } + self.prune_expired(); Ok(()) } }