improved all provider
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::time::Duration;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use error_chain::error_chain;
|
use error_chain::error_chain;
|
||||||
use futures::future::join_all;
|
use futures::StreamExt;
|
||||||
|
use futures::stream::FuturesUnordered;
|
||||||
use crate::api::{get_provider, ClientVersion};
|
use crate::api::{get_provider, ClientVersion};
|
||||||
use crate::providers::{DynProvider, Provider};
|
use crate::providers::{DynProvider, Provider};
|
||||||
use crate::status::Channel;
|
use crate::status::Channel;
|
||||||
@@ -41,7 +43,7 @@ impl Provider for AllProvider {
|
|||||||
per_page: String,
|
per_page: String,
|
||||||
options: ServerOptions,
|
options: ServerOptions,
|
||||||
) -> Vec<VideoItem> {
|
) -> Vec<VideoItem> {
|
||||||
let mut sites_str = options.clone().sites.unwrap();
|
let mut sites_str = options.clone().sites.unwrap_or_default();
|
||||||
if sites_str.is_empty() {
|
if sites_str.is_empty() {
|
||||||
let files = fs::read_dir("./src/providers").unwrap();
|
let files = fs::read_dir("./src/providers").unwrap();
|
||||||
let providers = files.map(|entry| entry.unwrap().file_name())
|
let providers = files.map(|entry| entry.unwrap().file_name())
|
||||||
@@ -51,14 +53,16 @@ impl Provider for AllProvider {
|
|||||||
.collect::<Vec<String>>();
|
.collect::<Vec<String>>();
|
||||||
sites_str = providers.join(",");
|
sites_str = providers.join(",");
|
||||||
}
|
}
|
||||||
|
|
||||||
let providers: Vec<DynProvider> = sites_str
|
let providers: Vec<DynProvider> = sites_str
|
||||||
.split(',')
|
.split(',')
|
||||||
.filter(|s| !s.is_empty())
|
.filter(|s| !s.is_empty())
|
||||||
.filter_map(|s| get_provider(s)) // assumes get_provider -> Option<DynProvider>
|
.filter_map(|s| get_provider(s))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let futures = providers.iter().map(|provider| {
|
let mut futures = FuturesUnordered::new();
|
||||||
let provider = provider.clone();
|
|
||||||
|
for provider in providers {
|
||||||
let cache = cache.clone();
|
let cache = cache.clone();
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let sort = sort.clone();
|
let sort = sort.clone();
|
||||||
@@ -66,39 +70,52 @@ impl Provider for AllProvider {
|
|||||||
let page = page.clone();
|
let page = page.clone();
|
||||||
let per_page = per_page.clone();
|
let per_page = per_page.clone();
|
||||||
let options = options.clone();
|
let options = options.clone();
|
||||||
async move {
|
|
||||||
match tokio::time::timeout(
|
// Spawn the task so it lives independently of this function
|
||||||
std::time::Duration::from_secs(55),
|
futures.push(tokio::spawn(async move {
|
||||||
provider.get_videos(
|
provider.get_videos(cache, pool, sort, query, page, per_page, options).await
|
||||||
cache,
|
}));
|
||||||
pool,
|
|
||||||
sort,
|
|
||||||
query,
|
|
||||||
page,
|
|
||||||
per_page,
|
|
||||||
options,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(_) => {
|
|
||||||
// timed out -> return empty result for this provider
|
|
||||||
vec![]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).collect::<Vec<_>>();
|
|
||||||
let results:Vec<Vec<VideoItem>> = join_all(futures).await;
|
|
||||||
let video_items: Vec<VideoItem> = interleave(&results);
|
|
||||||
return video_items;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_channel(&self,clientversion:ClientVersion) -> Option<Channel> {
|
let mut all_results = Vec::new();
|
||||||
println!("Getting channel for placeholder with client version: {:?}",clientversion);
|
let timeout_timer = tokio::time::sleep(Duration::from_secs(10));
|
||||||
|
tokio::pin!(timeout_timer);
|
||||||
|
|
||||||
|
// Collect what we can within 55 seconds
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(result) = futures.next() => {
|
||||||
|
// Ignore errors (panics or task cancellations)
|
||||||
|
if let Ok(videos) = result {
|
||||||
|
all_results.push(videos);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = &mut timeout_timer => {
|
||||||
|
// 55 seconds passed. Stop waiting and return what we have.
|
||||||
|
// The tasks remaining in 'futures' will continue running in the
|
||||||
|
// background because they were 'tokio::spawn'ed.
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
else => break, // All tasks finished before the timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interleave(&all_results)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_channel(&self, clientversion: ClientVersion) -> Option<Channel> {
|
||||||
let _ = clientversion;
|
let _ = clientversion;
|
||||||
Some(Channel {
|
Some(Channel {
|
||||||
id:"placeholder".to_string(),name:"PLACEHOLDER".to_string(),description:"PLACEHOLDER FOR PARENT CLASS".to_string(),premium:false,favicon:"https://www.google.com/s2/favicons?sz=64&domain=missav.ws".to_string(),status:"active".to_string(),categories:vec![],options:vec![],nsfw:true,cacheDuration:None,
|
id: "placeholder".to_string(),
|
||||||
|
name: "PLACEHOLDER".to_string(),
|
||||||
|
description: "PLACEHOLDER FOR PARENT CLASS".to_string(),
|
||||||
|
premium: false,
|
||||||
|
favicon: "https://hottub.spacemoehre.de/favicon.ico".to_string(),
|
||||||
|
status: "active".to_string(),
|
||||||
|
categories: vec![],
|
||||||
|
options: vec![],
|
||||||
|
nsfw: true,
|
||||||
|
cacheDuration: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use crate::util::requester;
|
|||||||
|
|
||||||
// Global cache: Map<ErrorSignature, LastSentTimestamp>
|
// Global cache: Map<ErrorSignature, LastSentTimestamp>
|
||||||
static ERROR_CACHE: Lazy<DashMap<String, u64>> = Lazy::new(DashMap::new);
|
static ERROR_CACHE: Lazy<DashMap<String, u64>> = Lazy::new(DashMap::new);
|
||||||
const COOLDOWN_SECONDS: u64 = 3600; // 1 Hour cooldown
|
// const COOLDOWN_SECONDS: u64 = 3600; // 1 Hour cooldown
|
||||||
|
|
||||||
pub fn format_error_chain(err: &dyn Error) -> String {
|
pub fn format_error_chain(err: &dyn Error) -> String {
|
||||||
let mut chain_str = String::new();
|
let mut chain_str = String::new();
|
||||||
@@ -40,11 +40,11 @@ pub async fn send_discord_error_report(
|
|||||||
// Create a unique key based on error content and location
|
// Create a unique key based on error content and location
|
||||||
let error_signature = format!("{}-{}-{}", error_msg, file, line);
|
let error_signature = format!("{}-{}-{}", error_msg, file, line);
|
||||||
|
|
||||||
if let Some(last_sent) = ERROR_CACHE.get(&error_signature) {
|
if let Some(_) = ERROR_CACHE.get(&error_signature) {
|
||||||
if now - *last_sent < COOLDOWN_SECONDS {
|
// if now - *last_sent < COOLDOWN_SECONDS {
|
||||||
// Error is still in cooldown, skip sending
|
// Error is still in cooldown, skip sending
|
||||||
return;
|
return;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the cache with the current timestamp
|
// Update the cache with the current timestamp
|
||||||
|
|||||||
Reference in New Issue
Block a user