spring cleaning and refactoring
This commit is contained in:
@@ -1,17 +1,23 @@
|
||||
"""Validate applications.json against schema and check for common issues."""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from constants import VARIANTS
|
||||
from constants import (
|
||||
COMMON_SETTINGS_KEYS,
|
||||
REGEX_SETTINGS_KEYS,
|
||||
SOURCE_HOST_MAP,
|
||||
SOURCE_SPECIFIC_KEYS,
|
||||
VALID_SOURCES,
|
||||
VARIANTS,
|
||||
)
|
||||
from utils import should_include_app
|
||||
|
||||
# Required fields for each app
|
||||
REQUIRED_FIELDS = {"id", "url", "author", "name"}
|
||||
|
||||
# Valid meta keys
|
||||
VALID_META_KEYS = {
|
||||
"excludeFromExport",
|
||||
"excludeFromTable",
|
||||
@@ -21,55 +27,206 @@ VALID_META_KEYS = {
|
||||
"includeInDualScreen",
|
||||
}
|
||||
|
||||
META_TYPO_MAP = {
|
||||
"exludeFromExport": "excludeFromExport",
|
||||
"exludeFromTable": "excludeFromTable",
|
||||
"nameOveride": "nameOverride",
|
||||
"urlOveride": "urlOverride",
|
||||
}
|
||||
|
||||
def validate_app(app: dict[str, Any], index: int) -> list[str]:
|
||||
"""Validate a single app entry and return list of errors."""
|
||||
|
||||
def _check_regex(pattern: str, field_name: str, app_name: str) -> str | None:
|
||||
if not pattern:
|
||||
return None
|
||||
try:
|
||||
re.compile(pattern)
|
||||
return None
|
||||
except re.error as e:
|
||||
return f"{app_name}: invalid regex in '{field_name}': {e} (pattern: {pattern!r})"
|
||||
|
||||
|
||||
def _detect_source_from_url(url: str) -> str | None:
|
||||
try:
|
||||
host = urlparse(url).netloc.lower().lstrip("www.")
|
||||
except Exception:
|
||||
return None
|
||||
for domain, source in SOURCE_HOST_MAP.items():
|
||||
if host == domain or host.endswith(f".{domain}"):
|
||||
return source
|
||||
return None
|
||||
|
||||
|
||||
def _valid_keys_for_source(source: str | None) -> set[str]:
|
||||
valid = set(COMMON_SETTINGS_KEYS)
|
||||
if source and source in SOURCE_SPECIFIC_KEYS:
|
||||
valid |= SOURCE_SPECIFIC_KEYS[source]
|
||||
return valid
|
||||
|
||||
|
||||
def _validate_required_fields(app: dict, app_name: str) -> list[str]:
|
||||
return [
|
||||
f"{app_name}: missing required field '{f}'"
|
||||
for f in REQUIRED_FIELDS
|
||||
if f not in app
|
||||
]
|
||||
|
||||
|
||||
def _validate_url(app: dict, app_name: str) -> list[str]:
|
||||
errors = []
|
||||
app_name = app.get("name", f"app[{index}]")
|
||||
url = app.get("url", "")
|
||||
if not url:
|
||||
return errors
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
if not parsed.scheme:
|
||||
errors.append(f"{app_name}: URL missing scheme (http/https): {url}")
|
||||
elif parsed.scheme not in ("http", "https"):
|
||||
errors.append(f"{app_name}: URL has non-http scheme: {parsed.scheme}")
|
||||
if not parsed.netloc:
|
||||
errors.append(f"{app_name}: URL missing host: {url}")
|
||||
except Exception as e:
|
||||
errors.append(f"{app_name}: malformed URL: {e}")
|
||||
return errors
|
||||
|
||||
# Check required fields
|
||||
for field in REQUIRED_FIELDS:
|
||||
if field not in app:
|
||||
errors.append(f"{app_name}: missing required field '{field}'")
|
||||
|
||||
# Validate meta keys if present
|
||||
def _validate_override_source(
|
||||
app: dict, app_name: str
|
||||
) -> tuple[list[str], list[str]]:
|
||||
errors, warnings = [], []
|
||||
source = app.get("overrideSource")
|
||||
url = app.get("url", "")
|
||||
|
||||
if source is not None and source not in VALID_SOURCES:
|
||||
errors.append(
|
||||
f"{app_name}: unknown overrideSource '{source}' "
|
||||
f"(valid: {', '.join(sorted(VALID_SOURCES))})"
|
||||
)
|
||||
elif source is None:
|
||||
warnings.append(f"{app_name}: missing overrideSource (auto-detection may be fragile)")
|
||||
|
||||
if url and source:
|
||||
detected = _detect_source_from_url(url)
|
||||
if detected and detected != source and source != "HTML" and detected != "HTML":
|
||||
warnings.append(
|
||||
f"{app_name}: URL host suggests '{detected}' but "
|
||||
f"overrideSource is '{source}'"
|
||||
)
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def _validate_apk_index(app: dict, app_name: str) -> list[str]:
|
||||
index = app.get("preferredApkIndex")
|
||||
if index is not None and (not isinstance(index, int) or index < 0):
|
||||
return [
|
||||
f"{app_name}: preferredApkIndex must be a non-negative integer, got {index!r}"
|
||||
]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_meta(app: dict, app_name: str) -> list[str]:
|
||||
errors = []
|
||||
meta = app.get("meta", {})
|
||||
for key in meta.keys():
|
||||
|
||||
for key in meta:
|
||||
if key not in VALID_META_KEYS:
|
||||
errors.append(f"{app_name}: unknown meta key '{key}' (typo?)")
|
||||
|
||||
# Check for common typos
|
||||
typo_checks = {
|
||||
"exludeFromExport": "excludeFromExport",
|
||||
"exludeFromTable": "excludeFromTable",
|
||||
"nameOveride": "nameOverride",
|
||||
"urlOveride": "urlOverride",
|
||||
}
|
||||
for typo, correct in typo_checks.items():
|
||||
for typo, correct in META_TYPO_MAP.items():
|
||||
if typo in meta:
|
||||
errors.append(f"{app_name}: typo in meta key '{typo}', should be '{correct}'")
|
||||
|
||||
# Validate additionalSettings is valid inner JSON
|
||||
additional_settings = app.get("additionalSettings")
|
||||
if additional_settings is not None:
|
||||
if not isinstance(additional_settings, str):
|
||||
errors.append(f"{app_name}: 'additionalSettings' should be a JSON string")
|
||||
else:
|
||||
try:
|
||||
json.loads(additional_settings)
|
||||
except json.JSONDecodeError as e:
|
||||
errors.append(f"{app_name}: 'additionalSettings' contains invalid JSON: {e}")
|
||||
|
||||
# Validate categories is a list
|
||||
categories = app.get("categories")
|
||||
if categories is not None and not isinstance(categories, list):
|
||||
errors.append(f"{app_name}: 'categories' should be a list")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def _validate_categories(app: dict, app_name: str) -> list[str]:
|
||||
categories = app.get("categories")
|
||||
if categories is not None and not isinstance(categories, list):
|
||||
return [f"{app_name}: 'categories' should be a list"]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_additional_settings(
|
||||
app: dict, app_name: str
|
||||
) -> tuple[list[str], list[str]]:
|
||||
errors, warnings = [], []
|
||||
raw = app.get("additionalSettings")
|
||||
if raw is None:
|
||||
return errors, warnings
|
||||
|
||||
if not isinstance(raw, str):
|
||||
errors.append(f"{app_name}: 'additionalSettings' should be a JSON string")
|
||||
return errors, warnings
|
||||
|
||||
try:
|
||||
settings = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
errors.append(f"{app_name}: 'additionalSettings' contains invalid JSON: {e}")
|
||||
return errors, warnings
|
||||
|
||||
if not isinstance(settings, dict):
|
||||
return errors, warnings
|
||||
|
||||
# Validate regex fields
|
||||
for key in REGEX_SETTINGS_KEYS:
|
||||
value = settings.get(key, "")
|
||||
if isinstance(value, str):
|
||||
err = _check_regex(value, key, app_name)
|
||||
if err:
|
||||
errors.append(err)
|
||||
|
||||
# Validate regex in intermediate link steps
|
||||
for i, link in enumerate(settings.get("intermediateLink", [])):
|
||||
if isinstance(link, dict):
|
||||
regex_val = link.get("customLinkFilterRegex", "")
|
||||
if isinstance(regex_val, str):
|
||||
err = _check_regex(regex_val, f"intermediateLink[{i}].customLinkFilterRegex", app_name)
|
||||
if err:
|
||||
errors.append(err)
|
||||
|
||||
# Check for source-inappropriate keys
|
||||
url = app.get("url", "")
|
||||
effective_source = app.get("overrideSource") or _detect_source_from_url(url)
|
||||
if effective_source:
|
||||
valid_keys = _valid_keys_for_source(effective_source)
|
||||
for key in settings:
|
||||
if key not in valid_keys:
|
||||
belongs_to = [
|
||||
s for s, keys in SOURCE_SPECIFIC_KEYS.items()
|
||||
if key in keys and s != effective_source
|
||||
]
|
||||
if belongs_to:
|
||||
warnings.append(
|
||||
f"{app_name}: additionalSettings key '{key}' "
|
||||
f"is for {'/'.join(belongs_to)}, not {effective_source}"
|
||||
)
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def validate_app(app: dict[str, Any], index: int) -> tuple[list[str], list[str]]:
|
||||
errors, warnings = [], []
|
||||
app_name = app.get("name", f"app[{index}]")
|
||||
|
||||
errors += _validate_required_fields(app, app_name)
|
||||
errors += _validate_url(app, app_name)
|
||||
|
||||
src_errors, src_warnings = _validate_override_source(app, app_name)
|
||||
errors += src_errors
|
||||
warnings += src_warnings
|
||||
|
||||
errors += _validate_apk_index(app, app_name)
|
||||
errors += _validate_meta(app, app_name)
|
||||
errors += _validate_categories(app, app_name)
|
||||
|
||||
settings_errors, settings_warnings = _validate_additional_settings(app, app_name)
|
||||
errors += settings_errors
|
||||
warnings += settings_warnings
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def check_duplicate_ids(apps: list[dict[str, Any]], variant: str) -> list[str]:
|
||||
"""Check for duplicate app IDs within a variant."""
|
||||
errors = []
|
||||
ids_seen: dict[str, str] = {}
|
||||
|
||||
@@ -79,9 +236,8 @@ def check_duplicate_ids(apps: list[dict[str, Any]], variant: str) -> list[str]:
|
||||
|
||||
app_id = app.get("id", "")
|
||||
app_name = app.get("name", "unknown")
|
||||
|
||||
if not app_id:
|
||||
continue # Already caught by required field check
|
||||
continue
|
||||
|
||||
if app_id in ids_seen:
|
||||
errors.append(
|
||||
@@ -95,7 +251,6 @@ def check_duplicate_ids(apps: list[dict[str, Any]], variant: str) -> list[str]:
|
||||
|
||||
|
||||
def validate_json(input_file: str) -> int:
|
||||
"""Validate applications.json and return exit code (0=success, 1=errors)."""
|
||||
try:
|
||||
with open(input_file, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
@@ -111,25 +266,33 @@ def validate_json(input_file: str) -> int:
|
||||
return 1
|
||||
|
||||
apps = data["apps"]
|
||||
all_errors = []
|
||||
all_errors, all_warnings = [], []
|
||||
|
||||
# Validate each app
|
||||
for i, app in enumerate(apps):
|
||||
errors = validate_app(app, i)
|
||||
errors, warnings = validate_app(app, i)
|
||||
all_errors.extend(errors)
|
||||
all_warnings.extend(warnings)
|
||||
|
||||
# Check for duplicate IDs per variant
|
||||
for variant in VARIANTS:
|
||||
errors = check_duplicate_ids(apps, variant)
|
||||
all_errors.extend(errors)
|
||||
all_errors.extend(check_duplicate_ids(apps, variant))
|
||||
|
||||
if all_warnings:
|
||||
print(f"Warnings ({len(all_warnings)}):\n")
|
||||
for warning in all_warnings:
|
||||
print(f" ~ {warning}")
|
||||
print()
|
||||
|
||||
if all_errors:
|
||||
print(f"Validation failed with {len(all_errors)} error(s):\n")
|
||||
for error in all_errors:
|
||||
print(f" - {error}")
|
||||
print(f" x {error}")
|
||||
return 1
|
||||
|
||||
print(f"Validation passed: {len(apps)} apps checked")
|
||||
print(f"Validation passed: {len(apps)} apps checked", end="")
|
||||
if all_warnings:
|
||||
print(f" ({len(all_warnings)} warnings)")
|
||||
else:
|
||||
print()
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user