runtime test fixes
This commit is contained in:
@@ -36,8 +36,10 @@ pub static ALL_PROVIDERS: Lazy<HashMap<&'static str, DynProvider>> = Lazy::new(|
|
|||||||
|
|
||||||
const CHANNEL_STATUS_ERROR: &str = "error";
|
const CHANNEL_STATUS_ERROR: &str = "error";
|
||||||
const VALIDATION_RESULTS_REQUIRED: usize = 5;
|
const VALIDATION_RESULTS_REQUIRED: usize = 5;
|
||||||
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(60);
|
const VALIDATION_MIN_SUCCESS: usize = 3;
|
||||||
|
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(3600);
|
||||||
const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100);
|
const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100);
|
||||||
|
const VALIDATION_ERROR_RETEST_INTERVAL: Duration = Duration::from_secs(5 * 60);
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct ProviderValidationContext {
|
struct ProviderValidationContext {
|
||||||
@@ -50,6 +52,7 @@ static PROVIDER_VALIDATION_CONTEXT: OnceLock<ProviderValidationContext> = OnceLo
|
|||||||
static PROVIDER_RUNTIME_STATUS: Lazy<DashMap<String, String>> = Lazy::new(DashMap::new);
|
static PROVIDER_RUNTIME_STATUS: Lazy<DashMap<String, String>> = Lazy::new(DashMap::new);
|
||||||
static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::new);
|
static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::new);
|
||||||
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
|
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
|
||||||
|
static PROVIDER_ERROR_REVALIDATION_STARTED: OnceLock<()> = OnceLock::new();
|
||||||
|
|
||||||
fn validation_client_version() -> ClientVersion {
|
fn validation_client_version() -> ClientVersion {
|
||||||
ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string())
|
ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string())
|
||||||
@@ -252,26 +255,72 @@ async fn run_provider_validation(provider_id: &str) -> Result<(), String> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut successes = 0usize;
|
||||||
|
let mut failures = Vec::new();
|
||||||
for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() {
|
for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() {
|
||||||
let (url, headers) = media_target(item);
|
let (url, headers) = media_target(item);
|
||||||
if url.is_empty() {
|
if url.is_empty() {
|
||||||
return Err(format!(
|
failures.push(format!(
|
||||||
"{provider_id} item {} returned an empty media url",
|
"{provider_id} item {} returned an empty media url",
|
||||||
item_index + 1
|
item_index + 1
|
||||||
));
|
));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
validate_media_response(
|
match validate_media_response(
|
||||||
provider_id,
|
provider_id,
|
||||||
item_index,
|
item_index,
|
||||||
&url,
|
&url,
|
||||||
headers,
|
headers,
|
||||||
context.requester.clone(),
|
context.requester.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => {
|
||||||
|
successes += 1;
|
||||||
|
if successes >= VALIDATION_MIN_SUCCESS {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => failures.push(error),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Err(format!(
|
||||||
|
"{provider_id} validation failed: only {successes} media checks passed (required at least {VALIDATION_MIN_SUCCESS}); failures={}",
|
||||||
|
failures.join(" | ")
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_periodic_error_revalidation() {
|
||||||
|
if PROVIDER_ERROR_REVALIDATION_STARTED.set(()).is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(VALIDATION_ERROR_RETEST_INTERVAL);
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
let errored_providers = PROVIDER_RUNTIME_STATUS
|
||||||
|
.iter()
|
||||||
|
.filter_map(|entry| {
|
||||||
|
if entry.value().as_str() == CHANNEL_STATUS_ERROR {
|
||||||
|
Some(entry.key().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
for provider_id in errored_providers {
|
||||||
|
schedule_provider_validation(
|
||||||
|
&provider_id,
|
||||||
|
"periodic_retest",
|
||||||
|
"provider currently marked as error",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn configure_runtime_validation(
|
pub fn configure_runtime_validation(
|
||||||
@@ -285,7 +334,9 @@ pub fn configure_runtime_validation(
|
|||||||
cache,
|
cache,
|
||||||
requester,
|
requester,
|
||||||
})
|
})
|
||||||
.map_err(|_| "provider validation context already configured")
|
.map_err(|_| "provider validation context already configured")?;
|
||||||
|
start_periodic_error_revalidation();
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_provider_channel_status(provider_id: &str) -> Option<String> {
|
pub fn current_provider_channel_status(provider_id: &str) -> Option<String> {
|
||||||
|
|||||||
Reference in New Issue
Block a user