provider refactors and fixes
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use capitalize::Capitalize;
|
||||
use cute::c;
|
||||
use error_chain::error_chain;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use crate::api::{get_provider, ClientVersion};
|
||||
use crate::providers::{DynProvider, Provider};
|
||||
use crate::status::Channel;
|
||||
use crate::providers::{DynProvider, Provider, report_provider_error, run_provider_guarded};
|
||||
use crate::status::{Channel, ChannelOption, FilterOption};
|
||||
use crate::util::cache::VideoCache;
|
||||
use crate::util::interleave;
|
||||
use crate::videos::{ServerOptions, VideoItem};
|
||||
@@ -45,24 +47,50 @@ impl Provider for AllProvider {
|
||||
) -> Vec<VideoItem> {
|
||||
let mut sites_str = options.clone().sites.unwrap_or_default();
|
||||
if sites_str.is_empty() {
|
||||
let files = fs::read_dir("./src/providers").unwrap();
|
||||
let providers = files.map(|entry| entry.unwrap().file_name())
|
||||
.filter(|name| name.to_str().unwrap().ends_with(".rs"))
|
||||
.filter(|name| !name.to_str().unwrap().contains("mod.rs") && !name.to_str().unwrap().contains("all.rs"))
|
||||
.map(|name| name.to_str().unwrap().replace(".rs", ""))
|
||||
let files = match fs::read_dir("./src/providers") {
|
||||
Ok(files) => files,
|
||||
Err(e) => {
|
||||
report_provider_error("all", "all.get_videos.read_dir", &e.to_string()).await;
|
||||
return vec![];
|
||||
}
|
||||
};
|
||||
let providers = files
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter_map(|entry| entry.file_name().into_string().ok())
|
||||
.filter(|name| name.ends_with(".rs"))
|
||||
.filter(|name| !name.contains("mod.rs") && !name.contains("all.rs"))
|
||||
.map(|name| name.replace(".rs", ""))
|
||||
.collect::<Vec<String>>();
|
||||
sites_str = providers.join(",");
|
||||
}
|
||||
|
||||
let providers: Vec<DynProvider> = sites_str
|
||||
let providers: Vec<(String, DynProvider)> = sites_str
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.filter_map(|s| get_provider(s))
|
||||
.filter_map(|s| {
|
||||
let provider = get_provider(s);
|
||||
if provider.is_none() {
|
||||
Some((s.to_string(), None))
|
||||
} else {
|
||||
provider.map(|p| (s.to_string(), Some(p)))
|
||||
}
|
||||
})
|
||||
.filter_map(|(name, provider)| match provider {
|
||||
Some(provider) => Some((name, provider)),
|
||||
None => {
|
||||
// fire-and-forget reporting of missing provider keys
|
||||
tokio::spawn(async move {
|
||||
report_provider_error("all", "all.get_videos.unknown_provider", &name).await;
|
||||
});
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
|
||||
for provider in providers {
|
||||
for (provider_name, provider) in providers {
|
||||
let cache = cache.clone();
|
||||
let pool = pool.clone();
|
||||
let sort = sort.clone();
|
||||
@@ -70,10 +98,16 @@ impl Provider for AllProvider {
|
||||
let page = page.clone();
|
||||
let per_page = per_page.clone();
|
||||
let options = options.clone();
|
||||
let provider_name_cloned = provider_name.clone();
|
||||
|
||||
// Spawn the task so it lives independently of this function
|
||||
futures.push(tokio::spawn(async move {
|
||||
provider.get_videos(cache, pool, sort, query, page, per_page, options).await
|
||||
run_provider_guarded(
|
||||
&provider_name_cloned,
|
||||
"all.get_videos.provider_task",
|
||||
provider.get_videos(cache, pool, sort, query, page, per_page, options),
|
||||
)
|
||||
.await
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -85,9 +119,11 @@ impl Provider for AllProvider {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(result) = futures.next() => {
|
||||
// Ignore errors (panics or task cancellations)
|
||||
if let Ok(videos) = result {
|
||||
all_results.push(videos);
|
||||
match result {
|
||||
Ok(videos) => all_results.push(videos),
|
||||
Err(e) => {
|
||||
report_provider_error("all", "all.get_videos.join_error", &e.to_string()).await;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = &mut timeout_timer => {
|
||||
@@ -105,17 +141,41 @@ impl Provider for AllProvider {
|
||||
|
||||
fn get_channel(&self, clientversion: ClientVersion) -> Option<Channel> {
|
||||
let _ = clientversion;
|
||||
let files = fs::read_dir("./src/providers").ok()?;
|
||||
let providers = files
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter_map(|entry| entry.file_name().into_string().ok())
|
||||
.filter(|name| name.ends_with(".rs"))
|
||||
.filter(|name| {
|
||||
!name.contains("mod.rs")
|
||||
&& !name.contains("all.rs")
|
||||
})
|
||||
.map(|name| name.replace(".rs", ""))
|
||||
.collect::<Vec<String>>();
|
||||
let sites = c![FilterOption {
|
||||
id: x.to_string(),
|
||||
title: x.capitalize().to_string(),
|
||||
}, for x in providers.iter()];
|
||||
|
||||
Some(Channel {
|
||||
id: "placeholder".to_string(),
|
||||
name: "PLACEHOLDER".to_string(),
|
||||
description: "PLACEHOLDER FOR PARENT CLASS".to_string(),
|
||||
id: "all".to_string(),
|
||||
name: "All".to_string(),
|
||||
description: "Query from all sites of this Server".to_string(),
|
||||
premium: false,
|
||||
favicon: "https://hottub.spacemoehre.de/favicon.ico".to_string(),
|
||||
status: "active".to_string(),
|
||||
categories: vec![],
|
||||
options: vec![],
|
||||
options: vec![ChannelOption {
|
||||
id: "sites".to_string(),
|
||||
title: "Sites".to_string(),
|
||||
description: "What Sites to use".to_string(),
|
||||
systemImage: "list.number".to_string(),
|
||||
colorName: "green".to_string(),
|
||||
options: sites,
|
||||
multiSelect: true,
|
||||
}],
|
||||
nsfw: true,
|
||||
cacheDuration: None,
|
||||
cacheDuration: Some(1800),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user