178 lines
6.2 KiB
Rust
178 lines
6.2 KiB
Rust
use crate::DbPool;
|
|
use crate::api::{ClientVersion, get_provider};
|
|
use crate::providers::{DynProvider, Provider, report_provider_error, run_provider_guarded};
|
|
use crate::status::{Channel, ChannelOption, FilterOption};
|
|
use crate::util::cache::VideoCache;
|
|
use crate::util::interleave;
|
|
use crate::videos::{ServerOptions, VideoItem};
|
|
use async_trait::async_trait;
|
|
use capitalize::Capitalize;
|
|
use cute::c;
|
|
use error_chain::error_chain;
|
|
use futures::StreamExt;
|
|
use futures::stream::FuturesUnordered;
|
|
use std::fs;
|
|
use std::time::Duration;
|
|
|
|
error_chain! {
|
|
foreign_links {
|
|
Io(std::io::Error);
|
|
HttpRequest(wreq::Error);
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
#[allow(dead_code)]
|
|
pub struct AllProvider {}
|
|
|
|
impl AllProvider {
|
|
pub fn new() -> Self {
|
|
AllProvider {}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Provider for AllProvider {
|
|
async fn get_videos(
|
|
&self,
|
|
cache: VideoCache,
|
|
pool: DbPool,
|
|
sort: String,
|
|
query: Option<String>,
|
|
page: String,
|
|
per_page: String,
|
|
options: ServerOptions,
|
|
) -> Vec<VideoItem> {
|
|
let mut sites_str = options.clone().sites.unwrap_or_default();
|
|
if sites_str.is_empty() {
|
|
let files = match fs::read_dir("./src/providers") {
|
|
Ok(files) => files,
|
|
Err(e) => {
|
|
report_provider_error("all", "all.get_videos.read_dir", &e.to_string()).await;
|
|
return vec![];
|
|
}
|
|
};
|
|
let providers = files
|
|
.filter_map(|entry| entry.ok())
|
|
.filter_map(|entry| entry.file_name().into_string().ok())
|
|
.filter(|name| name.ends_with(".rs"))
|
|
.filter(|name| !name.contains("mod.rs") && !name.contains("all.rs"))
|
|
.map(|name| name.replace(".rs", ""))
|
|
.collect::<Vec<String>>();
|
|
sites_str = providers.join(",");
|
|
}
|
|
|
|
let providers: Vec<(String, DynProvider)> = sites_str
|
|
.split(',')
|
|
.map(str::trim)
|
|
.filter(|s| !s.is_empty())
|
|
.filter_map(|s| {
|
|
let provider = get_provider(s);
|
|
if provider.is_none() {
|
|
Some((s.to_string(), None))
|
|
} else {
|
|
provider.map(|p| (s.to_string(), Some(p)))
|
|
}
|
|
})
|
|
.filter_map(|(name, provider)| match provider {
|
|
Some(provider) => Some((name, provider)),
|
|
None => {
|
|
// fire-and-forget reporting of missing provider keys
|
|
tokio::spawn(async move {
|
|
report_provider_error("all", "all.get_videos.unknown_provider", &name)
|
|
.await;
|
|
});
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
let mut futures = FuturesUnordered::new();
|
|
|
|
for (provider_name, provider) in providers {
|
|
let cache = cache.clone();
|
|
let pool = pool.clone();
|
|
let sort = sort.clone();
|
|
let query = query.clone();
|
|
let page = page.clone();
|
|
let per_page = per_page.clone();
|
|
let options = options.clone();
|
|
let provider_name_cloned = provider_name.clone();
|
|
|
|
// Spawn the task so it lives independently of this function
|
|
futures.push(tokio::spawn(async move {
|
|
run_provider_guarded(
|
|
&provider_name_cloned,
|
|
"all.get_videos.provider_task",
|
|
provider.get_videos(cache, pool, sort, query, page, per_page, options),
|
|
)
|
|
.await
|
|
}));
|
|
}
|
|
|
|
let mut all_results = Vec::new();
|
|
let timeout_timer = tokio::time::sleep(Duration::from_secs(10));
|
|
tokio::pin!(timeout_timer);
|
|
|
|
// Collect what we can within 55 seconds
|
|
loop {
|
|
tokio::select! {
|
|
Some(result) = futures.next() => {
|
|
match result {
|
|
Ok(videos) => all_results.push(videos),
|
|
Err(e) => {
|
|
report_provider_error("all", "all.get_videos.join_error", &e.to_string()).await;
|
|
}
|
|
}
|
|
},
|
|
_ = &mut timeout_timer => {
|
|
// 55 seconds passed. Stop waiting and return what we have.
|
|
// The tasks remaining in 'futures' will continue running in the
|
|
// background because they were 'tokio::spawn'ed.
|
|
break;
|
|
},
|
|
else => break, // All tasks finished before the timeout
|
|
}
|
|
}
|
|
|
|
interleave(&all_results)
|
|
}
|
|
|
|
fn get_channel(&self, clientversion: ClientVersion) -> Option<Channel> {
|
|
let _ = clientversion;
|
|
let files = fs::read_dir("./src/providers").ok()?;
|
|
let providers = files
|
|
.filter_map(|entry| entry.ok())
|
|
.filter_map(|entry| entry.file_name().into_string().ok())
|
|
.filter(|name| name.ends_with(".rs"))
|
|
.filter(|name| !name.contains("mod.rs") && !name.contains("all.rs"))
|
|
.map(|name| name.replace(".rs", ""))
|
|
.collect::<Vec<String>>();
|
|
let sites = c![FilterOption {
|
|
id: x.to_string(),
|
|
title: x.capitalize().to_string(),
|
|
}, for x in providers.iter()];
|
|
|
|
Some(Channel {
|
|
id: "all".to_string(),
|
|
name: "All".to_string(),
|
|
description: "Query from all sites of this Server".to_string(),
|
|
premium: false,
|
|
favicon: "https://hottub.spacemoehre.de/favicon.ico".to_string(),
|
|
status: "active".to_string(),
|
|
categories: vec![],
|
|
options: vec![ChannelOption {
|
|
id: "all_provider_sites".to_string(),
|
|
title: "Sites".to_string(),
|
|
description: "What Sites to use".to_string(),
|
|
systemImage: "list.number".to_string(),
|
|
colorName: "green".to_string(),
|
|
options: sites,
|
|
multiSelect: true,
|
|
}],
|
|
nsfw: true,
|
|
cacheDuration: Some(1800),
|
|
})
|
|
}
|
|
}
|