Files
Obtainium-Emulation-Pack/scripts/test-apps.py

747 lines
24 KiB
Python

#!/usr/bin/env python3
"""Live validation that app configs can resolve to downloadable APKs.
Set GITHUB_TOKEN in .env or environment to avoid API rate limits.
"""
import argparse
import json
import os
import re
import ssl
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from html.parser import HTMLParser
from typing import Any
from urllib.parse import urljoin, urlparse
from urllib.request import Request, urlopen
from help_formatter import StyledHelpFormatter
from utils import get_additional_settings, hydrate_settings, load_dotenv
USER_AGENT = (
"Mozilla/5.0 (Linux; Android 10; K) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/114.0.0.0 Mobile Safari/537.36"
)
REQUEST_TIMEOUT = 30
MAX_RELEASES_TO_CHECK = 25
APK_EXTENSIONS = (".apk", ".xapk")
MAX_DISPLAYED_APK_URLS = 3
MAX_DISPLAYED_APKS = 10
def _make_request(
url: str,
headers: dict[str, str] | None = None,
timeout: int = REQUEST_TIMEOUT,
) -> tuple[str, dict[str, str], str]:
"""Returns (body, response_headers, final_url). Allows self-signed certs."""
hdrs = {"User-Agent": USER_AGENT}
if headers:
hdrs.update(headers)
req = Request(url, headers=hdrs)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
resp = urlopen(req, timeout=timeout, context=ctx)
body = resp.read().decode("utf-8", errors="replace")
resp_headers = {k.lower(): v for k, v in resp.headers.items()}
return body, resp_headers, resp.url
def _fetch_json(
url: str,
headers: dict[str, str] | None = None,
) -> tuple[Any, dict[str, str]]:
hdrs = {"Accept": "application/json"}
if headers:
hdrs.update(headers)
body, resp_headers, _ = _make_request(url, headers=hdrs)
return json.loads(body), resp_headers
class LinkExtractor(HTMLParser):
def __init__(self, base_url: str):
super().__init__()
self.base_url = base_url
self.links: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag == "a":
for name, value in attrs:
if name == "href" and value:
self.links.append(urljoin(self.base_url, value))
def _extract_links(html_body: str, base_url: str) -> list[str]:
parser = LinkExtractor(base_url)
parser.feed(html_body)
return parser.links
def _filter_links_by_regex(links: list[str], regex: str) -> list[str]:
pattern = re.compile(regex)
return [link for link in links if pattern.search(link)]
def _filter_links_by_extension(links: list[str]) -> list[str]:
return [link for link in links if any(link.lower().endswith(ext) for ext in APK_EXTENSIONS)]
def _sort_links(
links: list[str],
skip_sort: bool = False,
reverse_sort: bool = False,
sort_by_last_segment: bool = False,
) -> list[str]:
if skip_sort:
return links
key = (lambda u: u.rsplit("/", 1)[-1]) if sort_by_last_segment else None
result = sorted(links, key=key)
if reverse_sort:
result.reverse()
return result
def _format_filter_context(**filters: str) -> str:
"""Build a diagnostic string of active filters, e.g. ', apkFilter=foo, titleFilter=bar'."""
parts = [f", {name}={value}" for name, value in filters.items() if value]
return "".join(parts)
def _apply_apk_filter(urls: list[str], settings: dict[str, Any]) -> list[str]:
apk_filter = settings.get("apkFilterRegEx", "")
if not apk_filter or not urls:
return urls
pattern = re.compile(apk_filter)
if settings.get("invertAPKFilter", False):
return [u for u in urls if not pattern.search(u)]
return [u for u in urls if pattern.search(u)]
def _replace_match_groups(match: re.Match, group_string: str) -> str | None:
"""Replicate Obtainium's replaceMatchGroupsInString behavior."""
if re.fullmatch(r"\d+", group_string):
group_string = f"${group_string}"
tokens = list(re.finditer(r"\$(\d+)", group_string))
if not tokens:
return None
result = group_string
for token in tokens:
group_num = int(token.group(1))
replacement = match.group(group_num) or ""
result = result.replace(token.group(0), replacement)
return result
def _extract_version(raw_version: str, settings: dict[str, Any]) -> tuple[str, str | None]:
"""Apply versionExtractionRegEx. Returns (version, warning_or_none)."""
regex_str = settings.get("versionExtractionRegEx", "")
if not regex_str or not raw_version:
return raw_version, None
try:
match = re.search(regex_str, raw_version)
if match:
group_to_use = settings.get("matchGroupToUse", "")
if group_to_use:
version = _replace_match_groups(match, group_to_use)
if version:
return version, None
return raw_version, "matchGroupToUse produced no result"
return match.group(0), None
except re.error as e:
return raw_version, f"versionExtractionRegEx error: {e}"
return raw_version, None
def _check_apk_index(app: dict[str, Any], apk_count: int) -> str | None:
"""Returns a warning string if preferredApkIndex is out of bounds."""
index = app.get("preferredApkIndex", 0)
if apk_count > 0 and index >= apk_count:
return f"preferredApkIndex={index} but only {apk_count} APKs found"
return None
class TestResult:
def __init__(self, app_name: str, app_id: str, source: str, url: str):
self.app_name = app_name
self.app_id = app_id
self.source = source
self.url = url
self.passed = False
self.version: str | None = None
self.apk_count = 0
self.apk_urls: list[str] = []
self.preferred_apk_index = 0
self.error: str | None = None
self.warnings: list[str] = []
self.duration_ms = 0
def to_dict(self) -> dict[str, Any]:
return {
"app_name": self.app_name,
"app_id": self.app_id,
"source": self.source,
"url": self.url,
"passed": self.passed,
"version": self.version,
"apk_count": self.apk_count,
"apk_urls": self.apk_urls,
"preferred_apk_index": self.preferred_apk_index,
"error": self.error,
"warnings": self.warnings,
"duration_ms": self.duration_ms,
}
def __repr__(self) -> str:
status = "PASS" if self.passed else "FAIL"
return f"{status}: {self.app_name} ({self.source})"
def _github_headers() -> dict[str, str]:
headers = {"Accept": "application/vnd.github.v3+json"}
token = os.environ.get("GITHUB_TOKEN", "")
if token:
headers["Authorization"] = f"token {token}"
return headers
def _parse_owner_repo(url: str) -> tuple[str, str, str]:
"""Returns (owner, repo, host)."""
parsed = urlparse(url)
parts = parsed.path.strip("/").split("/")
if len(parts) < 2:
raise ValueError(f"Cannot parse owner/repo from: {url}")
return parts[0], parts[1], parsed.netloc
def _collect_apks_from_assets(assets: list[dict], settings: dict[str, Any]) -> list[str]:
"""Extract APK download URLs from a GitHub/Codeberg release's asset list."""
urls = []
for asset in assets:
name = asset.get("name", "").lower()
dl_url = asset.get("browser_download_url", "")
if any(name.endswith(ext) for ext in APK_EXTENSIONS):
urls.append(dl_url)
elif name.endswith(".zip") and settings.get("includeZips", False):
urls.append(dl_url)
return urls
def _find_release_with_apks(
releases: list[dict],
settings: dict[str, Any],
title_filter: re.Pattern | None = None,
notes_filter: re.Pattern | None = None,
) -> tuple[dict | None, list[str]]:
"""Walk releases and return the first one with matching APK assets.
Returns (target_release, filtered_apk_urls). For track-only apps,
falls back to any release with a tag_name even if no APKs found.
"""
include_prereleases = settings.get("includePrereleases", False)
track_only = settings.get("trackOnly", False)
fallback = settings.get("fallbackToOlderReleases", True)
for release in releases:
if release.get("draft", False):
continue
if release.get("prerelease", False) and not include_prereleases:
continue
if title_filter:
name = release.get("name", "") or ""
if not title_filter.search(name):
continue
if notes_filter:
body = release.get("body", "") or ""
if not notes_filter.search(body):
continue
apk_urls = _collect_apks_from_assets(release.get("assets", []), settings)
apk_urls = _apply_apk_filter(apk_urls, settings)
if not apk_urls and not track_only:
if fallback:
continue
break
return release, apk_urls
# Track-only fallback: any release with a version tag
if track_only:
for release in releases:
if release.get("tag_name"):
return release, []
return None, []
def test_github(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
result = TestResult(app["name"], app["id"], "GitHub", app["url"])
try:
owner, repo, _ = _parse_owner_repo(app["url"])
except ValueError as e:
result.error = str(e)
return result
api_url = f"https://api.github.com/repos/{owner}/{repo}/releases?per_page={MAX_RELEASES_TO_CHECK}"
try:
releases, resp_headers = _fetch_json(api_url, headers=_github_headers())
except Exception as e:
result.error = f"GitHub API error: {e}"
if "403" in str(e) or "rate" in str(e).lower():
result.error += " (rate limited - set GITHUB_TOKEN env var)"
return result
remaining = resp_headers.get("x-ratelimit-remaining", "")
if remaining and int(remaining) < 10:
result.warnings.append(f"GitHub API rate limit low: {remaining} remaining")
if not releases:
result.error = "No releases found"
return result
title_str = settings.get("filterReleaseTitlesByRegEx", "")
notes_str = settings.get("filterReleaseNotesByRegEx", "")
title_regex = re.compile(title_str) if title_str else None
notes_regex = re.compile(notes_str) if notes_str else None
target, apk_urls = _find_release_with_apks(
releases, settings, title_filter=title_regex, notes_filter=notes_regex
)
if not target:
prerelease_state = "on" if settings.get("includePrereleases", False) else "off"
context = _format_filter_context(
titleFilter=title_str,
apkFilter=settings.get("apkFilterRegEx", ""),
)
result.error = (
f"No releases with matching APK assets found "
f"(checked {len(releases)} releases, prereleases={prerelease_state}{context})"
)
return result
version = target.get("tag_name", "") or target.get("name", "")
version, warning = _extract_version(version, settings)
if warning:
result.warnings.append(warning)
index_warning = _check_apk_index(app, len(apk_urls))
if index_warning:
result.warnings.append(index_warning)
result.passed = True
result.version = version
result.apk_count = len(apk_urls)
result.apk_urls = apk_urls
result.preferred_apk_index = app.get("preferredApkIndex", 0)
return result
def test_codeberg(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
result = TestResult(app["name"], app["id"], "Codeberg", app["url"])
try:
owner, repo, host = _parse_owner_repo(app["url"])
except ValueError as e:
result.error = str(e)
return result
api_url = f"https://{host}/api/v1/repos/{owner}/{repo}/releases?limit={MAX_RELEASES_TO_CHECK}"
try:
releases, _ = _fetch_json(api_url)
except Exception as e:
result.error = f"Codeberg API error: {e}"
return result
if not releases:
result.error = "No releases found"
return result
target, apk_urls = _find_release_with_apks(releases, settings)
if not target:
result.error = "No releases with matching APK assets"
return result
version = target.get("tag_name", "") or target.get("name", "")
version, warning = _extract_version(version, settings)
if warning:
result.warnings.append(warning)
index_warning = _check_apk_index(app, len(apk_urls))
if index_warning:
result.warnings.append(index_warning)
result.passed = True
result.version = version
result.apk_count = len(apk_urls)
result.apk_urls = apk_urls
result.preferred_apk_index = app.get("preferredApkIndex", 0)
return result
def _parse_request_headers(settings: dict[str, Any]) -> dict[str, str]:
headers = {}
for header_obj in settings.get("requestHeader", []):
if isinstance(header_obj, dict):
header_str = header_obj.get("requestHeader", "")
if ": " in header_str:
key, val = header_str.split(": ", 1)
headers[key] = val
return headers
def _follow_intermediate_links(
start_url: str,
steps: list[dict],
headers: dict[str, str],
) -> tuple[str, str | None]:
"""Walk intermediateLink chain. Returns (final_url, error_or_none)."""
current_url = start_url
for i, step in enumerate(steps):
if not isinstance(step, dict):
continue
try:
body, _, final_url = _make_request(current_url, headers=headers)
except Exception as e:
return current_url, f"Failed to fetch intermediate URL ({current_url}): {e}"
links = _extract_links(body, final_url)
step_regex = step.get("customLinkFilterRegex", "")
if step_regex:
links = _filter_links_by_regex(links, step_regex)
links = _sort_links(
links,
skip_sort=step.get("skipSort", False),
reverse_sort=step.get("reverseSort", False),
sort_by_last_segment=step.get("sortByLastLinkSegment", False),
)
if not links:
return current_url, (
f"Intermediate link step {i} found no matching links "
f"(url={current_url}, regex={step_regex!r})"
)
current_url = links[-1] # Obtainium takes the last link after sorting
return current_url, None
def test_html(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
result = TestResult(app["name"], app["id"], "HTML", app["url"])
req_headers = _parse_request_headers(settings)
intermediate_links = settings.get("intermediateLink", [])
current_url, error = _follow_intermediate_links(app["url"], intermediate_links, req_headers)
if error:
result.error = error
return result
try:
body, _, final_url = _make_request(current_url, headers=req_headers)
except Exception as e:
result.error = f"Failed to fetch final URL ({current_url}): {e}"
return result
links = _extract_links(body, final_url)
custom_regex = settings.get("customLinkFilterRegex", "")
apk_links = _filter_links_by_regex(links, custom_regex) if custom_regex else _filter_links_by_extension(links)
apk_links = _apply_apk_filter(apk_links, settings)
track_only = settings.get("trackOnly", False)
if not apk_links and not track_only:
context = _format_filter_context(
customLinkFilterRegex=custom_regex,
apkFilterRegEx=settings.get("apkFilterRegEx", ""),
)
result.error = (
f"No APK links found on page ({current_url}{context}, "
f"{len(links)} total links on page)"
)
return result
version = None
version_regex_str = settings.get("versionExtractionRegEx", "")
if version_regex_str:
extract_whole_page = settings.get("versionExtractWholePage", False)
if extract_whole_page:
search_text = body
elif apk_links:
search_text = apk_links[-1] # Obtainium uses last link
else:
search_text = ""
version, warning = _extract_version(search_text, settings)
if warning:
result.warnings.append(warning)
if not version:
pseudo_method = settings.get("defaultPseudoVersioningMethod", "")
if pseudo_method:
version = f"<pseudo:{pseudo_method}>"
else:
result.warnings.append("No version extracted (no regex match, no pseudo-method)")
index_warning = _check_apk_index(app, len(apk_links))
if index_warning:
result.warnings.append(index_warning)
result.passed = True
result.version = version
result.apk_count = len(apk_links)
result.apk_urls = apk_links
result.preferred_apk_index = app.get("preferredApkIndex", 0)
return result
def _effective_source(app: dict[str, Any]) -> str:
override = app.get("overrideSource")
if override:
return override
host = urlparse(app.get("url", "")).netloc.lower().lstrip("www.")
if "github.com" in host:
return "GitHub"
if "gitlab.com" in host:
return "GitLab"
if "codeberg.org" in host:
return "Codeberg"
if "f-droid.org" in host:
return "FDroid"
return "HTML"
def test_app(app: dict[str, Any]) -> TestResult:
source = _effective_source(app)
try:
sparse = get_additional_settings(app)
settings = hydrate_settings(sparse, source)
except (json.JSONDecodeError, TypeError):
result = TestResult(app.get("name", "?"), app.get("id", "?"), source, app.get("url", "?"))
result.error = "Cannot parse additionalSettings"
return result
start = time.monotonic()
if source == "GitHub":
result = test_github(app, settings)
elif source == "Codeberg":
result = test_codeberg(app, settings)
elif source in ("HTML", "DirectAPKLink"):
result = test_html(app, settings)
else:
result = TestResult(app.get("name", "?"), app.get("id", "?"), source, app.get("url", "?"))
result.passed = True
result.warnings.append(f"Skipped: source type '{source}' not yet supported")
result.duration_ms = int((time.monotonic() - start) * 1000)
return result
def _filename_from_url(url: str) -> str:
return urlparse(url).path.rsplit("/", 1)[-1] or url
def print_result(
result: TestResult,
verbose: bool = False,
show_apks: bool = False,
) -> None:
status = "\033[32mPASS\033[0m" if result.passed else "\033[31mFAIL\033[0m"
version_str = f" v{result.version}" if result.version else ""
apk_str = f" ({result.apk_count} APKs)" if result.apk_count else ""
print(f" {status} {result.app_name}{version_str}{apk_str} [{result.duration_ms}ms]")
if result.error:
print(f" Error: {result.error}")
for warning in result.warnings:
print(f" \033[33mWarn\033[0m: {warning}")
if show_apks and result.apk_urls:
filenames = [_filename_from_url(u) for u in result.apk_urls]
has_dupes = len(set(filenames)) < len(filenames)
display_urls = result.apk_urls[:MAX_DISPLAYED_APKS]
for i, url in enumerate(display_urls):
label = url if has_dupes else filenames[i]
marker = " \033[36m<-- preferredApkIndex\033[0m" if i == result.preferred_apk_index else ""
print(f" [{i}] {label}{marker}")
remaining = len(result.apk_urls) - MAX_DISPLAYED_APKS
if remaining > 0:
print(f" ... and {remaining} more")
elif verbose and result.apk_urls:
for url in result.apk_urls[:MAX_DISPLAYED_APK_URLS]:
print(f" APK: {url}")
def _print_json_error(message: str) -> None:
output = {
"summary": {"total": 0, "passed": 0, "failed": 0, "warned": 0, "wall_time_ms": 0, "cumulative_time_ms": 0},
"results": [],
"error": message,
}
print(json.dumps(output, indent=2))
def main() -> int:
load_dotenv()
parser = argparse.ArgumentParser(
description="Live-test app configs resolve to downloadable APKs.",
formatter_class=StyledHelpFormatter,
)
parser.add_argument(
"name",
nargs="?",
help="Filter by app name (case-insensitive substring match)",
)
parser.add_argument(
"-f", "--file",
default="src/applications.json",
help="Path to applications.json (default: src/applications.json)",
)
parser.add_argument(
"--id",
dest="id_filter",
help="Filter by exact app ID",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Show APK download URLs",
)
parser.add_argument(
"--apks",
action="store_true",
help="Show numbered APK list with preferredApkIndex marker",
)
parser.add_argument(
"-j", "--jobs",
type=int,
default=8,
help="Number of parallel workers (default: 8, use 1 for serial)",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON (for CI/scripting)",
)
args = parser.parse_args()
json_file = args.file
name_filter = args.name.lower() if args.name else None
id_filter = args.id_filter
verbose = args.verbose
show_apks = args.apks
workers = max(args.jobs, 1)
json_output = args.json
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
if json_output:
_print_json_error(f"Error loading {json_file}: {e}")
else:
print(f"Error loading {json_file}: {e}")
return 1
apps = data.get("apps", [])
if id_filter:
apps = [a for a in apps if a.get("id") == id_filter]
elif name_filter:
apps = [a for a in apps if name_filter in a.get("name", "").lower()]
if not apps:
if json_output:
_print_json_error("No apps matched the filter.")
else:
print("No apps matched the filter.")
return 1
if not json_output:
has_token = bool(os.environ.get("GITHUB_TOKEN"))
github_count = sum(1 for a in apps if _effective_source(a) == "GitHub")
if github_count > 0 and not has_token:
print(
f"\033[33mNote\033[0m: {github_count} GitHub apps to test, "
"but GITHUB_TOKEN is not set. You may hit rate limits.\n"
" Set it with: export GITHUB_TOKEN=<your_token>\n"
)
serial = workers == 1 or len(apps) == 1
if not json_output:
mode = "serial" if serial else f"{workers} workers"
print(f"Testing {len(apps)} app(s) ({mode})...\n")
wall_start = time.monotonic()
if serial:
results = []
for app in apps:
result = test_app(app)
results.append(result)
if not json_output:
print_result(result, verbose=verbose, show_apks=show_apks)
else:
result_map: dict[str, TestResult] = {}
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(test_app, app): app for app in apps}
for future in as_completed(futures):
result = future.result()
result_map[result.app_id] = result
results = [result_map[app["id"]] for app in apps]
if not json_output:
for result in results:
print_result(result, verbose=verbose, show_apks=show_apks)
wall_ms = int((time.monotonic() - wall_start) * 1000)
passed = sum(1 for r in results if r.passed)
failed = sum(1 for r in results if not r.passed)
warned = sum(1 for r in results if r.warnings)
sum_time = sum(r.duration_ms for r in results)
if json_output:
output = {
"summary": {
"total": len(results),
"passed": passed,
"failed": failed,
"warned": warned,
"wall_time_ms": wall_ms,
"cumulative_time_ms": sum_time,
},
"results": [r.to_dict() for r in results],
}
print(json.dumps(output, indent=2))
else:
print(f"\n{'=' * 60}")
print(f"Results: {passed} passed, {failed} failed, {warned} with warnings")
print(f"Time: {wall_ms / 1000:.1f}s wall, {sum_time / 1000:.1f}s cumulative")
if failed > 0:
print(f"\nFailed apps:")
for r in results:
if not r.passed:
print(f" - {r.app_name}: {r.error}")
return 1 if failed > 0 else 0
if __name__ == "__main__":
sys.exit(main())