spring cleaning
This commit is contained in:
@@ -5,9 +5,8 @@ import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from utils import load_dotenv
|
||||
from utils import detect_source_from_url, load_dotenv
|
||||
|
||||
CATEGORIES = [
|
||||
"Emulator",
|
||||
@@ -25,22 +24,6 @@ VARIANT_OPTIONS = [
|
||||
"README only",
|
||||
]
|
||||
|
||||
SOURCE_DETECTION = {
|
||||
"github.com": "GitHub",
|
||||
"gitlab.com": "GitLab",
|
||||
"codeberg.org": "Codeberg",
|
||||
"f-droid.org": "F-Droid",
|
||||
}
|
||||
|
||||
|
||||
def detect_source(url: str):
|
||||
parsed = urlparse(url)
|
||||
host = parsed.netloc.lower()
|
||||
for domain, source in SOURCE_DETECTION.items():
|
||||
if domain in host:
|
||||
return source
|
||||
return None
|
||||
|
||||
|
||||
def extract_github_info(url: str) -> tuple[str, str] | None:
|
||||
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", url)
|
||||
@@ -65,7 +48,6 @@ def prompt_yes_no(message: str, default: bool = True) -> bool:
|
||||
|
||||
|
||||
def select_menu(title: str, choices: list[str], default: int = 0) -> str:
|
||||
# Only use curses if we have a real terminal
|
||||
if not sys.stdin.isatty():
|
||||
return _select_menu_fallback(title, choices, default)
|
||||
|
||||
@@ -74,7 +56,6 @@ def select_menu(title: str, choices: list[str], default: int = 0) -> str:
|
||||
|
||||
return _select_menu_curses(title, choices, default)
|
||||
except Exception:
|
||||
# Fallback to simple input if curses fails
|
||||
return _select_menu_fallback(title, choices, default)
|
||||
|
||||
|
||||
@@ -194,61 +175,49 @@ def main() -> int:
|
||||
print(" Add New App to Obtainium Emulation Pack")
|
||||
print("=" * 50)
|
||||
|
||||
# Get URL first - we can auto-detect a lot from it
|
||||
url = prompt("\nApp URL (GitHub/GitLab/etc.)")
|
||||
if not url:
|
||||
print("URL is required.")
|
||||
return 1
|
||||
|
||||
# Detect source
|
||||
source = detect_source(url)
|
||||
source = detect_source_from_url(url)
|
||||
if source:
|
||||
print(f" Detected source: {source}")
|
||||
else:
|
||||
source = prompt("Source type", "GitHub")
|
||||
|
||||
# Try to extract info from GitHub URL
|
||||
author = ""
|
||||
name = ""
|
||||
github_info = extract_github_info(url)
|
||||
if github_info:
|
||||
author, repo_name = github_info
|
||||
# Clean up name (replace hyphens with spaces, title case)
|
||||
name = repo_name.replace("-", " ").replace("_", " ").title()
|
||||
print(f" Detected author: {author}")
|
||||
print(f" Detected name: {name}")
|
||||
|
||||
# Confirm or override detected values
|
||||
author = prompt("Author", author)
|
||||
name = prompt("App name", name)
|
||||
|
||||
# App ID
|
||||
app_id = prompt("Android package ID (e.g., com.example.app)")
|
||||
if not app_id:
|
||||
print("Package ID is required.")
|
||||
return 1
|
||||
|
||||
# Category - interactive menu
|
||||
category = select_menu("Select category:", CATEGORIES)
|
||||
print(f" Selected: {category}")
|
||||
|
||||
# Variant - interactive menu
|
||||
variant = select_menu("Include in which release(s):", VARIANT_OPTIONS)
|
||||
print(f" Selected: {variant}")
|
||||
|
||||
# Pre-releases
|
||||
include_prereleases = prompt_yes_no("Include pre-releases?", False)
|
||||
print(f" Include pre-releases: {'Yes' if include_prereleases else 'No'}")
|
||||
|
||||
# Verify latest tag
|
||||
verify_latest_tag = prompt_yes_no("Verify latest tag?", False)
|
||||
print(f" Verify latest tag: {'Yes' if verify_latest_tag else 'No'}")
|
||||
|
||||
# Allow ID change
|
||||
allow_id_change = prompt_yes_no("Allow ID change?", False)
|
||||
print(f" Allow ID change: {'Yes' if allow_id_change else 'No'}")
|
||||
|
||||
# Optional overrides
|
||||
print("")
|
||||
app_name_override = input(
|
||||
"App name override - leave blank to skip (sets display name in both Obtainium & README): "
|
||||
@@ -258,7 +227,6 @@ def main() -> int:
|
||||
|
||||
url_override = input("Homepage URL override - leave blank to skip: ").strip()
|
||||
|
||||
# Generate the entry
|
||||
app_entry = generate_app_entry(
|
||||
app_id=app_id,
|
||||
url=url,
|
||||
@@ -274,18 +242,15 @@ def main() -> int:
|
||||
url_override=url_override or None,
|
||||
)
|
||||
|
||||
# Show preview
|
||||
print("\n" + "=" * 50)
|
||||
print(" Generated Entry Preview")
|
||||
print("=" * 50)
|
||||
print(json.dumps(app_entry, indent=2))
|
||||
|
||||
# Confirm and save
|
||||
if not prompt_yes_no("\nAdd this app to applications.json?", True):
|
||||
print("Cancelled.")
|
||||
return 0
|
||||
|
||||
# Load existing file
|
||||
apps_file = Path("src/applications.json")
|
||||
if not apps_file.exists():
|
||||
print(f"Error: {apps_file} not found. Run from repo root.")
|
||||
@@ -294,7 +259,6 @@ def main() -> int:
|
||||
with open(apps_file, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Check for duplicate ID
|
||||
existing_ids = {app["id"] for app in data.get("apps", [])}
|
||||
if app_id in existing_ids:
|
||||
print(f"\nWarning: App with ID '{app_id}' already exists!")
|
||||
@@ -302,20 +266,16 @@ def main() -> int:
|
||||
print("Cancelled.")
|
||||
return 0
|
||||
|
||||
# Add the new app
|
||||
data["apps"].append(app_entry)
|
||||
|
||||
# Write back
|
||||
with open(apps_file, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
|
||||
print(f"\nApp added to {apps_file}")
|
||||
|
||||
# Offer to live-test the new app config
|
||||
if prompt_yes_no("\nRun live test on this app?", True):
|
||||
load_dotenv()
|
||||
# Import here to avoid circular deps and keep startup fast
|
||||
from importlib import import_module
|
||||
|
||||
test_mod = import_module("test-apps")
|
||||
|
||||
@@ -78,15 +78,10 @@ DEPRECATED_SETTINGS_KEYS: dict[str, str] = {
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Obtainium additionalSettings schema
|
||||
# Single source of truth for key metadata: default value, applicable sources,
|
||||
# whether the value is a regex pattern. Derived from Obtainium source code:
|
||||
# lib/app_sources/*.dart. Reference: ~/code/Obtainium
|
||||
#
|
||||
# Dict insertion order defines the canonical key ordering used by
|
||||
# normalize-json.py and export hydration.
|
||||
# ---------------------------------------------------------------------------
|
||||
# Obtainium additionalSettings schema: single source of truth for key metadata
|
||||
# (default value, applicable sources, regex flag). Derived from Obtainium source
|
||||
# code (lib/app_sources/*.dart). Dict insertion order defines canonical key
|
||||
# ordering used by normalize-json.py and export hydration.
|
||||
|
||||
ALL_SOURCES = frozenset(VALID_SOURCES)
|
||||
|
||||
@@ -106,7 +101,6 @@ class SettingDef(NamedTuple):
|
||||
|
||||
|
||||
SETTINGS_SCHEMA: dict[str, SettingDef] = {
|
||||
# --- GitHub / Codeberg source-specific ---
|
||||
"includePrereleases": SettingDef(False, _GITHUB_LIKE),
|
||||
"fallbackToOlderReleases": SettingDef(True, frozenset({"GitHub", "Codeberg", "GitLab", "SourceHut", "APKPure", "APKMirror"})),
|
||||
"filterReleaseTitlesByRegEx": SettingDef("", frozenset({"GitHub", "Codeberg", "APKMirror"}), is_regex=True),
|
||||
@@ -118,23 +112,18 @@ SETTINGS_SCHEMA: dict[str, SettingDef] = {
|
||||
"github-creds": SettingDef("", frozenset({"GitHub"})),
|
||||
"GHReqPrefix": SettingDef("", frozenset({"GitHub"})),
|
||||
|
||||
# --- GitLab source-specific ---
|
||||
"gitlab-creds": SettingDef("", frozenset({"GitLab"})),
|
||||
|
||||
# --- FDroid / IzzyOnDroid source-specific ---
|
||||
"filterVersionsByRegEx": SettingDef("", frozenset({"FDroid", "IzzyOnDroid"}), is_regex=True),
|
||||
"trySelectingSuggestedVersionCode": SettingDef(True, frozenset({"FDroid", "IzzyOnDroid", "FDroidRepo"})),
|
||||
"autoSelectHighestVersionCode": SettingDef(False, frozenset({"FDroid", "IzzyOnDroid"})),
|
||||
|
||||
# --- FDroidRepo source-specific ---
|
||||
"appIdOrName": SettingDef("", frozenset({"FDroidRepo"})),
|
||||
"pickHighestVersionCode": SettingDef(False, frozenset({"FDroidRepo"})),
|
||||
|
||||
# --- APKPure source-specific ---
|
||||
"stayOneVersionBehind": SettingDef(False, frozenset({"APKPure"})),
|
||||
"useFirstApkOfVersion": SettingDef(True, frozenset({"APKPure", "Farsroid"})),
|
||||
|
||||
# --- HTML source-specific ---
|
||||
"intermediateLink": SettingDef([], frozenset({"HTML"})),
|
||||
"customLinkFilterRegex": SettingDef("", frozenset({"HTML"}), is_regex=True),
|
||||
"filterByLinkText": SettingDef(False, frozenset({"HTML"})),
|
||||
@@ -146,7 +135,6 @@ SETTINGS_SCHEMA: dict[str, SettingDef] = {
|
||||
"requestHeader": SettingDef(_DEFAULT_USER_AGENT_HEADER, frozenset({"HTML", "DirectAPKLink"})),
|
||||
"defaultPseudoVersioningMethod": SettingDef("partialAPKHash", frozenset({"HTML", "DirectAPKLink"})),
|
||||
|
||||
# --- Common keys (all sources) ---
|
||||
"trackOnly": SettingDef(False, ALL_SOURCES),
|
||||
"versionExtractionRegEx": SettingDef("", ALL_SOURCES, is_regex=True),
|
||||
"matchGroupToUse": SettingDef("", ALL_SOURCES),
|
||||
@@ -168,10 +156,6 @@ SETTINGS_SCHEMA: dict[str, SettingDef] = {
|
||||
"zippedApkFilterRegEx": SettingDef("", ALL_SOURCES, is_regex=True),
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Derived views - computed from SETTINGS_SCHEMA so there's one place to update
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
COMMON_SETTINGS_KEYS: set[str] = {
|
||||
key for key, s in SETTINGS_SCHEMA.items() if s.sources == ALL_SOURCES
|
||||
}
|
||||
|
||||
@@ -2,39 +2,34 @@
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
from utils import get_additional_settings, should_include_app, stringify_additional_settings
|
||||
|
||||
|
||||
def minify_json(input_file: str, output_file: str, variant: str = "standard") -> None:
|
||||
try:
|
||||
with open(input_file, "r", encoding="utf-8") as f:
|
||||
data: dict[str, Any] = json.load(f)
|
||||
with open(input_file, "r", encoding="utf-8") as f:
|
||||
data: dict[str, Any] = json.load(f)
|
||||
|
||||
if "apps" in data:
|
||||
filtered_apps = []
|
||||
for app in data["apps"]:
|
||||
if should_include_app(app, variant):
|
||||
app_copy = app.copy()
|
||||
app_copy.pop("meta", None)
|
||||
settings = get_additional_settings(app_copy)
|
||||
source = app_copy.get("overrideSource")
|
||||
app_copy["additionalSettings"] = stringify_additional_settings(settings, source)
|
||||
filtered_apps.append(app_copy)
|
||||
data["apps"] = filtered_apps
|
||||
if "apps" in data:
|
||||
filtered_apps = []
|
||||
for app in data["apps"]:
|
||||
if should_include_app(app, variant):
|
||||
app_copy = app.copy()
|
||||
app_copy.pop("meta", None)
|
||||
settings = get_additional_settings(app_copy)
|
||||
source = app_copy.get("overrideSource")
|
||||
app_copy["additionalSettings"] = stringify_additional_settings(settings, source)
|
||||
filtered_apps.append(app_copy)
|
||||
data["apps"] = filtered_apps
|
||||
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, separators=(",", ":"), ensure_ascii=False)
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, separators=(",", ":"), ensure_ascii=False)
|
||||
|
||||
variant_label = f" ({variant})" if variant != "standard" else ""
|
||||
print(
|
||||
f"Minified JSON{variant_label} saved to {output_file} ({len(data.get('apps', []))} apps included)"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
variant_label = f" ({variant})" if variant != "standard" else ""
|
||||
print(
|
||||
f"Minified JSON{variant_label} saved to {output_file} ({len(data.get('apps', []))} apps included)"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Normalize key order and backfill defaults in applications.json."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
@@ -7,7 +8,6 @@ from typing import Any
|
||||
|
||||
from constants import SETTINGS_SCHEMA, SRC_FILE
|
||||
|
||||
# Canonical top-level key order for each app entry
|
||||
KEY_ORDER = [
|
||||
"id",
|
||||
"url",
|
||||
@@ -21,12 +21,10 @@ KEY_ORDER = [
|
||||
"meta",
|
||||
]
|
||||
|
||||
# Fields to backfill with defaults when missing
|
||||
DEFAULTS: dict[str, object] = {
|
||||
"allowIdChange": False,
|
||||
}
|
||||
|
||||
# Settings key order derived from SETTINGS_SCHEMA insertion order
|
||||
_SETTINGS_KEY_ORDER = list(SETTINGS_SCHEMA.keys())
|
||||
|
||||
|
||||
@@ -35,7 +33,6 @@ def _order_dict(d: dict[str, Any], key_order: list[str]) -> dict[str, Any]:
|
||||
for key in key_order:
|
||||
if key in d:
|
||||
ordered[key] = d[key]
|
||||
# Preserve any unexpected keys at the end (safety net)
|
||||
for key in d:
|
||||
if key not in ordered:
|
||||
ordered[key] = d[key]
|
||||
@@ -72,7 +69,6 @@ def normalize(input_path: str) -> int:
|
||||
for i, app in enumerate(apps):
|
||||
normalized = normalize_app(app)
|
||||
|
||||
# Check if anything changed (key order or new defaults)
|
||||
if list(app.keys()) != list(normalized.keys()) or app != normalized:
|
||||
changes += 1
|
||||
|
||||
|
||||
@@ -36,14 +36,7 @@ def _parse_gh_json(result: subprocess.CompletedProcess) -> list | None:
|
||||
|
||||
def _ensure_label_exists() -> None:
|
||||
result = _run_gh(["label", "list", "--search", ISSUE_LABEL, "--json", "name"])
|
||||
labels = _parse_gh_json(result)
|
||||
if labels is None:
|
||||
_run_gh([
|
||||
"label", "create", ISSUE_LABEL,
|
||||
"--description", "Automatically created when a scheduled app test fails",
|
||||
"--color", "d93f0b",
|
||||
])
|
||||
return
|
||||
labels = _parse_gh_json(result) or []
|
||||
if not any(label["name"] == ISSUE_LABEL for label in labels):
|
||||
_run_gh([
|
||||
"label", "create", ISSUE_LABEL,
|
||||
|
||||
@@ -26,7 +26,6 @@ import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -35,8 +34,6 @@ from help_formatter import StyledHelpFormatter
|
||||
from utils import get_additional_settings, get_application_url, get_display_name, load_dotenv, make_obtainium_link, should_include_app
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
|
||||
# Release artifact paths (relative to repo root)
|
||||
STANDARD_JSON = REPO_ROOT / "obtainium-emulation-pack-latest.json"
|
||||
DUAL_SCREEN_JSON = REPO_ROOT / "obtainium-emulation-pack-dual-screen-latest.json"
|
||||
APPLICATIONS_JSON = REPO_ROOT / "src" / "applications.json"
|
||||
@@ -185,25 +182,22 @@ def diff_apps(
|
||||
new_apps: dict[str, dict[str, Any]],
|
||||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
|
||||
"""Returns (added, changed, removed) app lists. Removed entries use the old version."""
|
||||
old_ids = set(old_apps.keys())
|
||||
new_ids = set(new_apps.keys())
|
||||
old_keys = set(old_apps.keys())
|
||||
new_keys = set(new_apps.keys())
|
||||
|
||||
added = [new_apps[id] for id in sorted(new_ids - old_ids)]
|
||||
|
||||
removed = [old_apps[id] for id in sorted(old_ids - new_ids)]
|
||||
added = [new_apps[k] for k in sorted(new_keys - old_keys)]
|
||||
removed = [old_apps[k] for k in sorted(old_keys - new_keys)]
|
||||
|
||||
changed = []
|
||||
for id in sorted(old_ids & new_ids):
|
||||
old_norm = normalize_app_for_comparison(old_apps[id])
|
||||
new_norm = normalize_app_for_comparison(new_apps[id])
|
||||
for k in sorted(old_keys & new_keys):
|
||||
old_norm = normalize_app_for_comparison(old_apps[k])
|
||||
new_norm = normalize_app_for_comparison(new_apps[k])
|
||||
if json.dumps(old_norm, sort_keys=True) != json.dumps(new_norm, sort_keys=True):
|
||||
changed.append(new_apps[id])
|
||||
changed.append(new_apps[k])
|
||||
|
||||
return added, changed, removed
|
||||
|
||||
|
||||
# Table rendering for release notes
|
||||
|
||||
def _make_ref_key(app: dict[str, Any]) -> str:
|
||||
return get_display_name(app).lower().replace(" ", "-").replace("!", "").replace("(", "").replace(")", "")
|
||||
|
||||
@@ -282,7 +276,6 @@ def _git_log_lines(since_tag: str | None, pretty_format: str) -> list[str]:
|
||||
def extract_github_username(email: str) -> str | None:
|
||||
if not email.endswith(GITHUB_NOREPLY_SUFFIX):
|
||||
return None
|
||||
# Noreply format: "id+username" or just "username"
|
||||
local_part = email[: -len(GITHUB_NOREPLY_SUFFIX)]
|
||||
if "+" in local_part:
|
||||
return local_part.split("+", 1)[1]
|
||||
@@ -480,7 +473,6 @@ def main() -> None:
|
||||
|
||||
latest = args.since or get_latest_tag()
|
||||
|
||||
# Determine version
|
||||
if args.version:
|
||||
version = args.version
|
||||
if not version.startswith("v"):
|
||||
@@ -491,14 +483,12 @@ def main() -> None:
|
||||
else:
|
||||
version = prompt_version(latest)
|
||||
|
||||
# Check if tag already exists
|
||||
if not args.dry_run:
|
||||
result = run(["git", "tag", "-l", version], capture=True)
|
||||
if version in result.stdout.strip().splitlines():
|
||||
print(f"Error: Tag {version} already exists.")
|
||||
sys.exit(1)
|
||||
|
||||
# Detect changed apps
|
||||
print("\nDetecting app changes...")
|
||||
old_apps = load_apps_from_ref(latest) if latest else {}
|
||||
new_apps = load_apps_from_file()
|
||||
@@ -508,13 +498,11 @@ def main() -> None:
|
||||
print(f" Changed: {len(changed)}")
|
||||
print(f" Removed: {len(removed)}")
|
||||
|
||||
# Determine release notes
|
||||
if args.notes_file:
|
||||
notes = Path(args.notes_file).read_text().strip()
|
||||
elif args.notes:
|
||||
notes = args.notes
|
||||
else:
|
||||
# Auto-generate and open in editor
|
||||
notes = generate_release_notes(latest, added, changed, removed, version)
|
||||
|
||||
if args.dry_run:
|
||||
@@ -531,7 +519,6 @@ def main() -> None:
|
||||
print("Warning: Release notes are empty. Using auto-generated notes.")
|
||||
notes = ""
|
||||
|
||||
# Dry run summary
|
||||
if args.dry_run:
|
||||
print()
|
||||
print("=== DRY RUN ===")
|
||||
@@ -549,14 +536,12 @@ def main() -> None:
|
||||
print()
|
||||
return
|
||||
|
||||
# Verify artifacts exist
|
||||
for f in (STANDARD_JSON, DUAL_SCREEN_JSON):
|
||||
if not f.exists():
|
||||
print(f"Error: Expected artifact not found: {f}")
|
||||
print("Did you run `just build` first?")
|
||||
sys.exit(1)
|
||||
|
||||
# Show summary before proceeding
|
||||
std_count = get_app_count(STANDARD_JSON)
|
||||
ds_count = get_app_count(DUAL_SCREEN_JSON)
|
||||
|
||||
@@ -576,7 +561,6 @@ def main() -> None:
|
||||
print("Aborted.")
|
||||
sys.exit(0)
|
||||
|
||||
# Commit any uncommitted changes (e.g. from `just build`)
|
||||
if not check_working_tree_clean():
|
||||
print()
|
||||
print("Working tree has changes. Committing...")
|
||||
@@ -584,7 +568,6 @@ def main() -> None:
|
||||
run(["git", "commit", "-m", f"release: {version}"])
|
||||
run(["git", "push"])
|
||||
|
||||
# Create versioned copies for upload
|
||||
versioned_copies = create_versioned_copies(version)
|
||||
|
||||
try:
|
||||
|
||||
@@ -18,7 +18,7 @@ from urllib.parse import urljoin, urlparse
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from help_formatter import StyledHelpFormatter
|
||||
from utils import get_additional_settings, hydrate_settings, load_dotenv
|
||||
from utils import detect_source_from_url, get_additional_settings, hydrate_settings, load_dotenv
|
||||
|
||||
USER_AGENT = (
|
||||
"Mozilla/5.0 (Linux; Android 10; K) "
|
||||
@@ -36,16 +36,17 @@ def _make_request(
|
||||
url: str,
|
||||
headers: dict[str, str] | None = None,
|
||||
timeout: int = REQUEST_TIMEOUT,
|
||||
allow_insecure: bool = False,
|
||||
) -> tuple[str, dict[str, str], str]:
|
||||
"""Returns (body, response_headers, final_url). Allows self-signed certs."""
|
||||
hdrs = {"User-Agent": USER_AGENT}
|
||||
if headers:
|
||||
hdrs.update(headers)
|
||||
|
||||
req = Request(url, headers=hdrs)
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
if allow_insecure:
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
resp = urlopen(req, timeout=timeout, context=ctx)
|
||||
body = resp.read().decode("utf-8", errors="replace")
|
||||
@@ -56,11 +57,12 @@ def _make_request(
|
||||
def _fetch_json(
|
||||
url: str,
|
||||
headers: dict[str, str] | None = None,
|
||||
allow_insecure: bool = False,
|
||||
) -> tuple[Any, dict[str, str]]:
|
||||
hdrs = {"Accept": "application/json"}
|
||||
if headers:
|
||||
hdrs.update(headers)
|
||||
body, resp_headers, _ = _make_request(url, headers=hdrs)
|
||||
body, resp_headers, _ = _make_request(url, headers=hdrs, allow_insecure=allow_insecure)
|
||||
return json.loads(body), resp_headers
|
||||
|
||||
|
||||
@@ -89,7 +91,7 @@ def _filter_links_by_regex(links: list[str], regex: str) -> list[str]:
|
||||
|
||||
|
||||
def _filter_links_by_extension(links: list[str]) -> list[str]:
|
||||
return [link for link in links if any(link.lower().endswith(ext) for ext in APK_EXTENSIONS)]
|
||||
return [link for link in links if link.lower().endswith(APK_EXTENSIONS)]
|
||||
|
||||
|
||||
def _sort_links(
|
||||
@@ -166,6 +168,22 @@ def _check_apk_index(app: dict[str, Any], apk_count: int) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
def _finalize_success(
|
||||
result: "TestResult",
|
||||
app: dict[str, Any],
|
||||
version: str | None,
|
||||
apk_urls: list[str],
|
||||
) -> None:
|
||||
index_warning = _check_apk_index(app, len(apk_urls))
|
||||
if index_warning:
|
||||
result.warnings.append(index_warning)
|
||||
result.passed = True
|
||||
result.version = version
|
||||
result.apk_count = len(apk_urls)
|
||||
result.apk_urls = apk_urls
|
||||
result.preferred_apk_index = app.get("preferredApkIndex", 0)
|
||||
|
||||
|
||||
class TestResult:
|
||||
def __init__(self, app_name: str, app_id: str, source: str, url: str):
|
||||
self.app_name = app_name
|
||||
@@ -225,7 +243,7 @@ def _collect_apks_from_assets(assets: list[dict], settings: dict[str, Any]) -> l
|
||||
for asset in assets:
|
||||
name = asset.get("name", "").lower()
|
||||
dl_url = asset.get("browser_download_url", "")
|
||||
if any(name.endswith(ext) for ext in APK_EXTENSIONS):
|
||||
if name.endswith(APK_EXTENSIONS):
|
||||
urls.append(dl_url)
|
||||
elif name.endswith(".zip") and settings.get("includeZips", False):
|
||||
urls.append(dl_url)
|
||||
@@ -283,6 +301,7 @@ def _find_release_with_apks(
|
||||
|
||||
def test_github(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
result = TestResult(app["name"], app["id"], "GitHub", app["url"])
|
||||
insecure = settings.get("allowInsecure", False)
|
||||
|
||||
try:
|
||||
owner, repo, _ = _parse_owner_repo(app["url"])
|
||||
@@ -293,7 +312,7 @@ def test_github(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
api_url = f"https://api.github.com/repos/{owner}/{repo}/releases?per_page={MAX_RELEASES_TO_CHECK}"
|
||||
|
||||
try:
|
||||
releases, resp_headers = _fetch_json(api_url, headers=_github_headers())
|
||||
releases, resp_headers = _fetch_json(api_url, headers=_github_headers(), allow_insecure=insecure)
|
||||
except Exception as e:
|
||||
result.error = f"GitHub API error: {e}"
|
||||
if "403" in str(e) or "rate" in str(e).lower():
|
||||
@@ -334,20 +353,13 @@ def test_github(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
if warning:
|
||||
result.warnings.append(warning)
|
||||
|
||||
index_warning = _check_apk_index(app, len(apk_urls))
|
||||
if index_warning:
|
||||
result.warnings.append(index_warning)
|
||||
|
||||
result.passed = True
|
||||
result.version = version
|
||||
result.apk_count = len(apk_urls)
|
||||
result.apk_urls = apk_urls
|
||||
result.preferred_apk_index = app.get("preferredApkIndex", 0)
|
||||
_finalize_success(result, app, version, apk_urls)
|
||||
return result
|
||||
|
||||
|
||||
def test_codeberg(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
result = TestResult(app["name"], app["id"], "Codeberg", app["url"])
|
||||
insecure = settings.get("allowInsecure", False)
|
||||
|
||||
try:
|
||||
owner, repo, host = _parse_owner_repo(app["url"])
|
||||
@@ -358,7 +370,7 @@ def test_codeberg(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
api_url = f"https://{host}/api/v1/repos/{owner}/{repo}/releases?limit={MAX_RELEASES_TO_CHECK}"
|
||||
|
||||
try:
|
||||
releases, _ = _fetch_json(api_url)
|
||||
releases, _ = _fetch_json(api_url, allow_insecure=insecure)
|
||||
except Exception as e:
|
||||
result.error = f"Codeberg API error: {e}"
|
||||
return result
|
||||
@@ -378,15 +390,7 @@ def test_codeberg(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
if warning:
|
||||
result.warnings.append(warning)
|
||||
|
||||
index_warning = _check_apk_index(app, len(apk_urls))
|
||||
if index_warning:
|
||||
result.warnings.append(index_warning)
|
||||
|
||||
result.passed = True
|
||||
result.version = version
|
||||
result.apk_count = len(apk_urls)
|
||||
result.apk_urls = apk_urls
|
||||
result.preferred_apk_index = app.get("preferredApkIndex", 0)
|
||||
_finalize_success(result, app, version, apk_urls)
|
||||
return result
|
||||
|
||||
|
||||
@@ -405,6 +409,7 @@ def _follow_intermediate_links(
|
||||
start_url: str,
|
||||
steps: list[dict],
|
||||
headers: dict[str, str],
|
||||
allow_insecure: bool = False,
|
||||
) -> tuple[str, str | None]:
|
||||
"""Walk intermediateLink chain. Returns (final_url, error_or_none)."""
|
||||
current_url = start_url
|
||||
@@ -412,7 +417,7 @@ def _follow_intermediate_links(
|
||||
if not isinstance(step, dict):
|
||||
continue
|
||||
try:
|
||||
body, _, final_url = _make_request(current_url, headers=headers)
|
||||
body, _, final_url = _make_request(current_url, headers=headers, allow_insecure=allow_insecure)
|
||||
except Exception as e:
|
||||
return current_url, f"Failed to fetch intermediate URL ({current_url}): {e}"
|
||||
|
||||
@@ -441,17 +446,18 @@ def _follow_intermediate_links(
|
||||
|
||||
def test_html(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
result = TestResult(app["name"], app["id"], "HTML", app["url"])
|
||||
insecure = settings.get("allowInsecure", False)
|
||||
|
||||
req_headers = _parse_request_headers(settings)
|
||||
intermediate_links = settings.get("intermediateLink", [])
|
||||
|
||||
current_url, error = _follow_intermediate_links(app["url"], intermediate_links, req_headers)
|
||||
current_url, error = _follow_intermediate_links(app["url"], intermediate_links, req_headers, allow_insecure=insecure)
|
||||
if error:
|
||||
result.error = error
|
||||
return result
|
||||
|
||||
try:
|
||||
body, _, final_url = _make_request(current_url, headers=req_headers)
|
||||
body, _, final_url = _make_request(current_url, headers=req_headers, allow_insecure=insecure)
|
||||
except Exception as e:
|
||||
result.error = f"Failed to fetch final URL ({current_url}): {e}"
|
||||
return result
|
||||
@@ -495,33 +501,12 @@ def test_html(app: dict[str, Any], settings: dict[str, Any]) -> TestResult:
|
||||
else:
|
||||
result.warnings.append("No version extracted (no regex match, no pseudo-method)")
|
||||
|
||||
index_warning = _check_apk_index(app, len(apk_links))
|
||||
if index_warning:
|
||||
result.warnings.append(index_warning)
|
||||
|
||||
result.passed = True
|
||||
result.version = version
|
||||
result.apk_count = len(apk_links)
|
||||
result.apk_urls = apk_links
|
||||
result.preferred_apk_index = app.get("preferredApkIndex", 0)
|
||||
_finalize_success(result, app, version, apk_links)
|
||||
return result
|
||||
|
||||
|
||||
def _effective_source(app: dict[str, Any]) -> str:
|
||||
override = app.get("overrideSource")
|
||||
if override:
|
||||
return override
|
||||
|
||||
host = urlparse(app.get("url", "")).netloc.lower().lstrip("www.")
|
||||
if "github.com" in host:
|
||||
return "GitHub"
|
||||
if "gitlab.com" in host:
|
||||
return "GitLab"
|
||||
if "codeberg.org" in host:
|
||||
return "Codeberg"
|
||||
if "f-droid.org" in host:
|
||||
return "FDroid"
|
||||
return "HTML"
|
||||
return app.get("overrideSource") or detect_source_from_url(app.get("url", "")) or "HTML"
|
||||
|
||||
|
||||
def test_app(app: dict[str, Any]) -> TestResult:
|
||||
|
||||
@@ -6,8 +6,9 @@ import os
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from constants import OBTAINIUM_SCHEME, REDIRECT_URL, SETTINGS_SCHEMA
|
||||
from constants import OBTAINIUM_SCHEME, REDIRECT_URL, SETTINGS_SCHEMA, SOURCE_HOST_MAP
|
||||
|
||||
|
||||
def load_dotenv() -> None:
|
||||
@@ -28,6 +29,18 @@ def load_dotenv() -> None:
|
||||
os.environ[key] = value
|
||||
|
||||
|
||||
def detect_source_from_url(url: str) -> str | None:
|
||||
"""Match a URL's host against SOURCE_HOST_MAP, including subdomains."""
|
||||
try:
|
||||
host = urlparse(url).netloc.lower().lstrip("www.")
|
||||
except Exception:
|
||||
return None
|
||||
for domain, source in SOURCE_HOST_MAP.items():
|
||||
if host == domain or host.endswith(f".{domain}"):
|
||||
return source
|
||||
return None
|
||||
|
||||
|
||||
def should_include_app(app: dict[str, Any], variant: str) -> bool:
|
||||
meta = app.get("meta", {})
|
||||
if meta.get("excludeFromExport", False):
|
||||
|
||||
@@ -10,12 +10,11 @@ from constants import (
|
||||
COMMON_SETTINGS_KEYS,
|
||||
DEPRECATED_SETTINGS_KEYS,
|
||||
REGEX_SETTINGS_KEYS,
|
||||
SOURCE_HOST_MAP,
|
||||
SOURCE_SPECIFIC_KEYS,
|
||||
VALID_SOURCES,
|
||||
VARIANTS,
|
||||
)
|
||||
from utils import get_additional_settings, should_include_app
|
||||
from utils import detect_source_from_url, get_additional_settings, should_include_app
|
||||
|
||||
REQUIRED_FIELDS = {"id", "url", "author", "name"}
|
||||
|
||||
@@ -46,17 +45,6 @@ def _check_regex(pattern: str, field_name: str, app_name: str) -> str | None:
|
||||
return f"{app_name}: invalid regex in '{field_name}': {e} (pattern: {pattern!r})"
|
||||
|
||||
|
||||
def _detect_source_from_url(url: str) -> str | None:
|
||||
try:
|
||||
host = urlparse(url).netloc.lower().lstrip("www.")
|
||||
except Exception:
|
||||
return None
|
||||
for domain, source in SOURCE_HOST_MAP.items():
|
||||
if host == domain or host.endswith(f".{domain}"):
|
||||
return source
|
||||
return None
|
||||
|
||||
|
||||
def _valid_keys_for_source(source: str | None) -> set[str]:
|
||||
valid = set(COMMON_SETTINGS_KEYS) | set(DEPRECATED_SETTINGS_KEYS)
|
||||
if source and source in SOURCE_SPECIFIC_KEYS:
|
||||
@@ -106,7 +94,7 @@ def _validate_override_source(
|
||||
warnings.append(f"{app_name}: missing overrideSource (auto-detection may be fragile)")
|
||||
|
||||
if url and source:
|
||||
detected = _detect_source_from_url(url)
|
||||
detected = detect_source_from_url(url)
|
||||
if detected and detected != source and source != "HTML" and detected != "HTML":
|
||||
warnings.append(
|
||||
f"{app_name}: URL host suggests '{detected}' but "
|
||||
@@ -164,7 +152,6 @@ def _validate_additional_settings(
|
||||
if not isinstance(settings, dict):
|
||||
return errors, warnings
|
||||
|
||||
# Validate regex fields
|
||||
for key in REGEX_SETTINGS_KEYS:
|
||||
value = settings.get(key, "")
|
||||
if isinstance(value, str):
|
||||
@@ -172,7 +159,6 @@ def _validate_additional_settings(
|
||||
if err:
|
||||
errors.append(err)
|
||||
|
||||
# Validate regex in intermediate link steps
|
||||
for i, link in enumerate(settings.get("intermediateLink", [])):
|
||||
if isinstance(link, dict):
|
||||
regex_val = link.get("customLinkFilterRegex", "")
|
||||
@@ -181,16 +167,14 @@ def _validate_additional_settings(
|
||||
if err:
|
||||
errors.append(err)
|
||||
|
||||
# Warn on deprecated keys
|
||||
for key, replacement in DEPRECATED_SETTINGS_KEYS.items():
|
||||
if key in settings:
|
||||
warnings.append(
|
||||
f"{app_name}: deprecated key '{key}', use '{replacement}' instead"
|
||||
)
|
||||
|
||||
# Check for source-inappropriate keys
|
||||
url = app.get("url", "")
|
||||
effective_source = app.get("overrideSource") or _detect_source_from_url(url)
|
||||
effective_source = app.get("overrideSource") or detect_source_from_url(url)
|
||||
if effective_source:
|
||||
valid_keys = _valid_keys_for_source(effective_source)
|
||||
for key in settings:
|
||||
|
||||
Reference in New Issue
Block a user