import argparse
import base64
import hashlib
import hmac
import json
import math
import os
import re
import secrets
import socket
import ssl
import struct
import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from html.parser import HTMLParser
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse
from urllib.request import ProxyHandler, Request, build_opener, urlopen


_WS_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
_MYSQL_LOCK = threading.Lock()
_MYSQL_SCHEMA_READY = False
_MYSQL_DRIVER: str | None = None
_MYSQL_PERSIST_LAST_ERR_AT = 0.0
_MYSQL_PERSIST_LAST_ERR_MSG = ""
_RATE_LOCK = threading.Lock()
_RATE_STATE: dict[str, dict[str, Any]] = {}
_ID_CUSTOM_COLOR_LOCK = threading.Lock()
_ID_CUSTOM_COLOR_STATE: dict[str, Any] = {"fetched_at": 0, "map": {}}
_ID_CUSTOM_COLOR_CSS_LOCK = threading.Lock()
_ID_CUSTOM_COLOR_CSS_STATE: dict[str, Any] = {"fetched_at": 0, "css": ""}
_SW_AI_LOCK = threading.Lock()
_SW_AI_STATE: dict[str, Any] = {
    "weights": None,
    "weights_loaded_at": 0,
    "player_state": {},
}
_SW_ALERT_LOCK = threading.Lock()
_SW_ALERT_STATE: dict[str, Any] = {"last_by_player": {}}


def _mysql_enabled() -> bool:
    v = (os.getenv("MYSQL_ENABLE") or "").strip().lower()
    if v in {"1", "true", "yes", "on"}:
        return True
    if v in {"0", "false", "no", "off"}:
        return False
    if (os.getenv("MYSQL_HOST") or "").strip() or (os.getenv("MYSQL_URL") or "").strip():
        return True
    try:
        s = socket.socket()
        try:
            s.settimeout(0.2)
            s.connect(("127.0.0.1", 3306))
            return True
        finally:
            s.close()
    except Exception:
        return False


def _mysql_settings() -> dict[str, Any]:
    host = (os.getenv("MYSQL_HOST") or "127.0.0.1").strip() or "127.0.0.1"
    port_s = (os.getenv("MYSQL_PORT") or "3306").strip() or "3306"
    try:
        port = int(port_s)
    except Exception:
        port = 3306
    user = (os.getenv("MYSQL_USER") or "root").strip() or "root"
    password = os.getenv("MYSQL_PASSWORD") or ""
    db = (os.getenv("MYSQL_DB") or "ninjasage").strip() or "ninjasage"
    prefix = (os.getenv("MYSQL_TABLE_PREFIX") or "sw_").strip() or "sw_"
    return {
        "host": host,
        "port": port,
        "user": user,
        "password": password,
        "db": db,
        "prefix": prefix,
    }


def _mysql_import_driver() -> str:
    global _MYSQL_DRIVER
    if _MYSQL_DRIVER:
        return _MYSQL_DRIVER
    try:
        import mysql.connector  # type: ignore

        _MYSQL_DRIVER = "mysql.connector"
        return _MYSQL_DRIVER
    except Exception:
        pass
    try:
        import pymysql  # type: ignore

        _MYSQL_DRIVER = "pymysql"
        return _MYSQL_DRIVER
    except Exception:
        pass
    raise RuntimeError("missing_mysql_driver")


def _mysql_connect(with_db: bool) -> Any:
    settings = _mysql_settings()
    driver = _mysql_import_driver()
    host = settings["host"]
    port = settings["port"]
    user = settings["user"]
    password = settings["password"]
    db = settings["db"] if with_db else None

    if driver == "mysql.connector":
        import mysql.connector  # type: ignore

        kwargs: dict[str, Any] = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "autocommit": False,
        }
        if db:
            kwargs["database"] = db
        return mysql.connector.connect(**kwargs)

    if driver == "pymysql":
        import pymysql  # type: ignore

        kwargs = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "autocommit": False,
            "charset": "utf8mb4",
        }
        if db:
            kwargs["database"] = db
        return pymysql.connect(**kwargs)

    raise RuntimeError("unsupported_mysql_driver")


def _mysql_exec(conn: Any, sql: str, args: tuple[Any, ...] | None = None) -> None:
    cur = conn.cursor()
    try:
        cur.execute(sql, args or ())
    finally:
        try:
            cur.close()
        except Exception:
            pass


def _mysql_ensure_db_and_schema(conn: Any) -> None:
    global _MYSQL_SCHEMA_READY
    if _MYSQL_SCHEMA_READY:
        return
    with _MYSQL_LOCK:
        if _MYSQL_SCHEMA_READY:
            return
        settings = _mysql_settings()
        prefix = settings["prefix"]

        snapshots = f"{prefix}snapshots"
        rows = f"{prefix}rows"
        squads = f"{prefix}squads"
        ai_snapshots = f"{prefix}ai_snapshots"
        timeline = f"{prefix}timeline_history"
        alerts = f"{prefix}alert_history"
        feedback = f"{prefix}scenario_feedback"

        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{snapshots}` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `url_hash` CHAR(40) NOT NULL,
  `url` VARCHAR(512) NOT NULL,
  `fetched_at` INT NOT NULL,
  `season_end_at` INT NULL,
  `season_remaining` VARCHAR(64) NULL,
  `season_remaining_s` INT NULL,
  `season_ended` TINYINT(1) NULL,
  `row_count` INT NOT NULL,
  `payload_json` LONGTEXT NOT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_urlhash_fetched` (`url_hash`, `fetched_at`),
  KEY `idx_fetched_at` (`fetched_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{rows}` (
  `snapshot_id` BIGINT NOT NULL,
  `rank` INT NULL,
  `cid` VARCHAR(32) NULL,
  `name` VARCHAR(255) NOT NULL,
  `level` INT NULL,
  `squad` VARCHAR(32) NULL,
  `league` VARCHAR(32) NULL,
  `trophy` BIGINT NULL,
  `gap` BIGINT NULL,
  KEY `idx_snapshot` (`snapshot_id`),
  KEY `idx_cid` (`cid`),
  CONSTRAINT `fk_{rows}_snapshot` FOREIGN KEY (`snapshot_id`) REFERENCES `{snapshots}` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{squads}` (
  `snapshot_id` BIGINT NOT NULL,
  `name` VARCHAR(32) NOT NULL,
  `score` BIGINT NULL,
  `gap` BIGINT NULL,
  PRIMARY KEY (`snapshot_id`, `name`),
  CONSTRAINT `fk_{squads}_snapshot` FOREIGN KEY (`snapshot_id`) REFERENCES `{snapshots}` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{ai_snapshots}` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `player_key` VARCHAR(64) NOT NULL,
  `player_name` VARCHAR(255) NULL,
  `fetched_at` INT NOT NULL,
  `rank` INT NULL,
  `trophy` BIGINT NULL,
  `gap_to_above` BIGINT NULL,
  `avg_hr` DOUBLE NULL,
  `thr_peak` DOUBLE NULL,
  `momentum` VARCHAR(8) NULL,
  `remaining_min` INT NULL,
  `scenario` VARCHAR(32) NOT NULL,
  `command` VARCHAR(32) NOT NULL,
  `eta_burn_min` INT NULL,
  `eta_normal_min` INT NULL,
  `success_rate` INT NULL,
  `risk_counter` INT NULL,
  `risk_drop` INT NULL,
  `confidence` INT NULL,
  `reason` VARCHAR(512) NULL,
  `explain_json` TEXT NULL,
  `output_json` LONGTEXT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_player_fetched` (`player_key`, `fetched_at`),
  KEY `idx_player` (`player_key`),
  KEY `idx_fetched` (`fetched_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{timeline}` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `player_key` VARCHAR(64) NOT NULL,
  `ts` INT NOT NULL,
  `trophy` BIGINT NULL,
  `type` VARCHAR(16) NOT NULL,
  `ai_snapshot_id` BIGINT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `idx_player_ts` (`player_key`, `ts`),
  KEY `idx_type_ts` (`type`, `ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{alerts}` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `player_key` VARCHAR(64) NOT NULL,
  `ts` INT NOT NULL,
  `channel` VARCHAR(16) NOT NULL,
  `event` VARCHAR(32) NOT NULL,
  `state_hash` CHAR(40) NOT NULL,
  `message` TEXT NOT NULL,
  `sent` TINYINT(1) NOT NULL,
  `error` TEXT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `idx_player_ts` (`player_key`, `ts`),
  KEY `idx_event_ts` (`event`, `ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _mysql_exec(
            conn,
            f"""CREATE TABLE IF NOT EXISTS `{feedback}` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `player_key` VARCHAR(64) NOT NULL,
  `ts` INT NOT NULL,
  `scenario` VARCHAR(32) NOT NULL,
  `command` VARCHAR(32) NOT NULL,
  `success` TINYINT(1) NOT NULL,
  `delta_rank` INT NULL,
  `delta_gap` BIGINT NULL,
  `weights_json` TEXT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `idx_player_ts` (`player_key`, `ts`),
  KEY `idx_scenario_ts` (`scenario`, `ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
        )
        _MYSQL_SCHEMA_READY = True


def _mysql_open_ready() -> Any:
    settings = _mysql_settings()
    db = settings["db"]
    try:
        conn = _mysql_connect(with_db=True)
    except Exception:
        conn0 = _mysql_connect(with_db=False)
        try:
            _mysql_exec(conn0, "CREATE DATABASE IF NOT EXISTS `%s` DEFAULT CHARACTER SET utf8mb4" % db.replace("`", ""))
            conn0.commit()
        finally:
            try:
                conn0.close()
            except Exception:
                pass
        conn = _mysql_connect(with_db=True)
    _mysql_ensure_db_and_schema(conn)
    return conn


def _mysql_trunc(s: Any, max_len: int) -> str:
    x = str(s or "")
    return x[:max_len]


def _mysql_int(v: Any) -> int | None:
    n = _to_number(v)
    if n is None:
        return None
    try:
        return int(n)
    except Exception:
        return None


def _mysql_persist_payload(payload: Any) -> None:
    if not _mysql_enabled():
        return
    if not isinstance(payload, dict):
        return
    if payload.get("error"):
        return
    rows_val = payload.get("rows")
    if not isinstance(rows_val, list) or not rows_val:
        return

    settings = _mysql_settings()
    prefix = settings["prefix"]
    snapshots = f"{prefix}snapshots"
    rows_t = f"{prefix}rows"
    squads_t = f"{prefix}squads"

    url = str(payload.get("url") or "")
    url_hash = hashlib.sha1(url.encode("utf-8", "replace")).hexdigest()
    fetched_at = int(_mysql_int(payload.get("fetched_at")) or int(time.time()))
    season_end_at = _mysql_int(payload.get("season_end_at"))
    season_remaining = payload.get("season_remaining")
    season_remaining_s = _mysql_int(payload.get("season_remaining_s"))
    season_ended = payload.get("season_ended")
    if isinstance(season_ended, bool):
        season_ended_i = 1 if season_ended else 0
    else:
        season_ended_i = None
    payload_json = _dump_json(payload, pretty=False)
    row_count = int(_mysql_int(payload.get("count")) or len(rows_val))

    conn = _mysql_open_ready()
    try:
        cur = conn.cursor()
        try:
            cur.execute(
                f"""INSERT INTO `{snapshots}` (
  url_hash, url, fetched_at, season_end_at, season_remaining, season_remaining_s, season_ended, row_count, payload_json
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
  season_end_at=VALUES(season_end_at),
  season_remaining=VALUES(season_remaining),
  season_remaining_s=VALUES(season_remaining_s),
  season_ended=VALUES(season_ended),
  row_count=VALUES(row_count),
  payload_json=VALUES(payload_json)""",
                (
                    url_hash,
                    _mysql_trunc(url, 512),
                    fetched_at,
                    season_end_at,
                    _mysql_trunc(season_remaining, 64) if season_remaining is not None else None,
                    season_remaining_s,
                    season_ended_i,
                    row_count,
                    payload_json,
                ),
            )
            cur.execute(f"SELECT id FROM `{snapshots}` WHERE url_hash=%s AND fetched_at=%s", (url_hash, fetched_at))
            row = cur.fetchone()
            if not row:
                conn.commit()
                return
            snapshot_id = int(row[0])

            cur.execute(f"DELETE FROM `{rows_t}` WHERE snapshot_id=%s", (snapshot_id,))
            cur.execute(f"DELETE FROM `{squads_t}` WHERE snapshot_id=%s", (snapshot_id,))

            squads_val = payload.get("squads")
            if isinstance(squads_val, list) and squads_val:
                squad_rows: list[tuple[Any, ...]] = []
                for s in squads_val:
                    if not isinstance(s, dict):
                        continue
                    squad_rows.append(
                        (
                            snapshot_id,
                            _mysql_trunc(s.get("name"), 32),
                            _mysql_int(s.get("score")),
                            _mysql_int(s.get("gap")),
                        )
                    )
                if squad_rows:
                    cur.executemany(
                        f"INSERT INTO `{squads_t}` (snapshot_id, name, score, gap) VALUES (%s,%s,%s,%s)",
                        squad_rows,
                    )

            row_rows: list[tuple[Any, ...]] = []
            for r in rows_val:
                if not isinstance(r, dict):
                    continue
                row_rows.append(
                    (
                        snapshot_id,
                        _mysql_int(r.get("rank")),
                        _mysql_trunc(r.get("cid"), 32) if r.get("cid") is not None else None,
                        _mysql_trunc(r.get("name") or r.get("player_name") or "", 255),
                        _mysql_int(r.get("level")),
                        _mysql_trunc(r.get("squad_name") or r.get("squad") or "", 32) or None,
                        _mysql_trunc(r.get("league_name") or r.get("league") or "", 32) or None,
                        _mysql_int(r.get("trophy") if r.get("trophy") is not None else r.get("point")),
                        _mysql_int(r.get("gap")),
                    )
                )
            if row_rows:
                cur.executemany(
                    f"""INSERT INTO `{rows_t}` (
snapshot_id, rank, cid, name, level, squad, league, trophy, gap
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
                    row_rows,
                )
        finally:
            try:
                cur.close()
            except Exception:
                pass
        conn.commit()
    finally:
        try:
            conn.close()
        except Exception:
            pass


def _maybe_persist_payload(payload: Any) -> None:
    if not _mysql_enabled():
        return
    try:
        _mysql_persist_payload(payload)
    except Exception as e:
        msg0 = str(e) or "error"
        deadlock = "deadlock" in msg0.lower() or "1213" in msg0
        if deadlock:
            try:
                time.sleep(0.15)
                _mysql_persist_payload(payload)
                return
            except Exception as e2:
                msg0 = str(e2) or msg0

        global _MYSQL_PERSIST_LAST_ERR_AT, _MYSQL_PERSIST_LAST_ERR_MSG
        now = time.time()
        if msg0 == _MYSQL_PERSIST_LAST_ERR_MSG and now - _MYSQL_PERSIST_LAST_ERR_AT < 10:
            return
        _MYSQL_PERSIST_LAST_ERR_AT = now
        _MYSQL_PERSIST_LAST_ERR_MSG = msg0
        sys.stderr.write("mysql_persist_error: " + msg0 + "\n")
        sys.stderr.flush()


def _ws_read_http_headers(sock: socket.socket, timeout_s: int) -> bytes:
    sock.settimeout(timeout_s)
    buf = b""
    while b"\r\n\r\n" not in buf:
        chunk = sock.recv(4096)
        if not chunk:
            break
        buf += chunk
        if len(buf) > 65536:
            break
    return buf


def _ws_parse_http_response(raw: bytes) -> tuple[int, dict[str, str]]:
    header_bytes, _, _ = raw.partition(b"\r\n\r\n")
    lines = header_bytes.split(b"\r\n")
    if not lines:
        raise RuntimeError("websocket handshake failed: empty response")
    status_line = lines[0].decode("utf-8", "replace")
    m = re.match(r"HTTP/\d+\.\d+\s+(\d+)\s+", status_line)
    if not m:
        raise RuntimeError(f"websocket handshake failed: {status_line.strip()}")
    status = int(m.group(1))
    headers: dict[str, str] = {}
    for b in lines[1:]:
        try:
            k, v = b.split(b":", 1)
        except Exception:
            continue
        key = k.decode("utf-8", "replace").strip().lower()
        val = v.decode("utf-8", "replace").strip()
        if key and val:
            headers[key] = val
    return status, headers


def _ws_accept_for_key(key: str) -> str:
    h = hashlib.sha1((key + _WS_GUID).encode("utf-8")).digest()
    return base64.b64encode(h).decode("ascii")


def _ws_mask_payload(mask_key: bytes, payload: bytes) -> bytes:
    if not payload:
        return b""
    out = bytearray(payload)
    for i in range(len(out)):
        out[i] ^= mask_key[i % 4]
    return bytes(out)


def _ws_build_frame(opcode: int, payload: bytes, masked: bool) -> bytes:
    fin_opcode = 0x80 | (opcode & 0x0F)
    mask_bit = 0x80 if masked else 0x00
    n = len(payload)
    if n <= 125:
        header = bytes([fin_opcode, mask_bit | n])
        ext = b""
    elif n <= 65535:
        header = bytes([fin_opcode, mask_bit | 126])
        ext = struct.pack("!H", n)
    else:
        header = bytes([fin_opcode, mask_bit | 127])
        ext = struct.pack("!Q", n)

    if not masked:
        return header + ext + payload

    mask_key = os.urandom(4)
    masked_payload = _ws_mask_payload(mask_key, payload)
    return header + ext + mask_key + masked_payload


class _WebSocketConn:
    def __init__(self, sock: socket.socket) -> None:
        self.sock = sock
        self._recv_buf = b""

    def close(self) -> None:
        try:
            self.sock.close()
        except Exception:
            pass

    def send_text(self, text: str) -> None:
        data = text.encode("utf-8")
        frame = _ws_build_frame(1, data, masked=True)
        self.sock.sendall(frame)

    def send_pong(self, payload: bytes) -> None:
        frame = _ws_build_frame(10, payload, masked=True)
        self.sock.sendall(frame)

    def send_close(self) -> None:
        frame = _ws_build_frame(8, b"", masked=True)
        try:
            self.sock.sendall(frame)
        except Exception:
            pass

    def _recv_exact(self, n: int, timeout_s: int) -> bytes:
        self.sock.settimeout(timeout_s)
        out = b""
        while len(out) < n:
            chunk = self.sock.recv(n - len(out))
            if not chunk:
                raise EOFError("websocket closed")
            out += chunk
        return out

    def recv_message(self, timeout_s: int) -> str | bytes:
        message_parts: list[bytes] = []
        message_opcode: int | None = None
        while True:
            h = self._recv_exact(2, timeout_s=timeout_s)
            b1, b2 = h[0], h[1]
            fin = (b1 & 0x80) != 0
            opcode = b1 & 0x0F
            masked = (b2 & 0x80) != 0
            length = b2 & 0x7F

            if length == 126:
                length = struct.unpack("!H", self._recv_exact(2, timeout_s=timeout_s))[0]
            elif length == 127:
                length = struct.unpack("!Q", self._recv_exact(8, timeout_s=timeout_s))[0]

            mask_key = self._recv_exact(4, timeout_s=timeout_s) if masked else b""
            payload = self._recv_exact(int(length), timeout_s=timeout_s) if length else b""
            if masked:
                payload = _ws_mask_payload(mask_key, payload)

            if opcode == 8:
                self.send_close()
                raise EOFError("websocket closed")
            if opcode == 9:
                self.send_pong(payload)
                continue
            if opcode == 10:
                continue

            if opcode in (1, 2):
                message_opcode = opcode
                message_parts = [payload]
            elif opcode == 0 and message_opcode is not None:
                message_parts.append(payload)
            else:
                continue

            if fin and message_opcode is not None:
                data = b"".join(message_parts)
                if message_opcode == 1:
                    return data.decode("utf-8", "replace")
                return data


def _ws_connect(ws_url: str, origin: str | None, timeout_s: int) -> _WebSocketConn:
    parsed = urlparse(ws_url)
    scheme = (parsed.scheme or "").lower()
    if scheme not in {"ws", "wss"}:
        raise ValueError("ws_url must start with ws:// or wss://")
    host = parsed.hostname or ""
    if not host:
        raise ValueError("ws_url missing hostname")
    port = int(parsed.port or (443 if scheme == "wss" else 80))
    path = parsed.path or "/"
    if parsed.query:
        path += "?" + parsed.query

    raw_sock = socket.create_connection((host, port), timeout=timeout_s)
    if scheme == "wss":
        ctx = ssl.create_default_context()
        sock: socket.socket = ctx.wrap_socket(raw_sock, server_hostname=host)
    else:
        sock = raw_sock

    key = base64.b64encode(os.urandom(16)).decode("ascii")
    host_hdr = f"{host}:{port}" if (scheme == "ws" and port != 80) or (scheme == "wss" and port != 443) else host
    req_lines = [
        f"GET {path} HTTP/1.1",
        f"Host: {host_hdr}",
        "Upgrade: websocket",
        "Connection: Upgrade",
        f"Sec-WebSocket-Key: {key}",
        "Sec-WebSocket-Version: 13",
        "User-Agent: Mozilla/5.0",
    ]
    if origin:
        req_lines.append(f"Origin: {origin}")
    req = ("\r\n".join(req_lines) + "\r\n\r\n").encode("utf-8")
    sock.sendall(req)

    raw = _ws_read_http_headers(sock, timeout_s=timeout_s)
    status, headers = _ws_parse_http_response(raw)
    if status != 101:
        raise RuntimeError(f"websocket handshake failed: status={status}")
    accept = headers.get("sec-websocket-accept", "")
    if accept != _ws_accept_for_key(key):
        raise RuntimeError("websocket handshake failed: bad sec-websocket-accept")
    return _WebSocketConn(sock)


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def socketio_listen(
    ws_url: str,
    origin: str | None,
    timeout_s: int,
    max_packets: int,
    max_seconds: int,
    out: Path | None,
) -> int:
    conn = _ws_connect(ws_url, origin=origin, timeout_s=timeout_s)
    started = time.time()
    packet_count = 0

    out_fp = None
    if out:
        out_fp = out.open("a", encoding="utf-8")

    def emit(payload: dict[str, Any]) -> None:
        nonlocal packet_count
        packet_count += 1
        line = json.dumps(payload, ensure_ascii=False)
        if out_fp:
            out_fp.write(line + "\n")
            out_fp.flush()
        else:
            print(line)

    try:
        while True:
            if max_seconds and max_seconds > 0 and (time.time() - started) >= max_seconds:
                break
            if max_packets and max_packets > 0 and packet_count >= max_packets:
                break

            try:
                msg = conn.recv_message(timeout_s=max(1, min(5, timeout_s)))
            except (TimeoutError, socket.timeout):
                continue
            if not isinstance(msg, str):
                emit({"ts": _now_iso(), "kind": "binary", "size": len(msg)})
                continue

            raw_s = msg.strip()
            if raw_s == "":
                continue

            entry: dict[str, Any] = {"ts": _now_iso(), "kind": "engine", "raw": raw_s}

            if raw_s.startswith("0"):
                try:
                    entry["open"] = json.loads(raw_s[1:])
                except Exception:
                    pass
                emit(entry)
                conn.send_text("40")
                continue

            if raw_s.startswith("2"):
                conn.send_text("3" + raw_s[1:])
                entry["kind"] = "ping"
                emit(entry)
                continue

            if raw_s.startswith("3"):
                entry["kind"] = "pong"
                emit(entry)
                continue

            if raw_s.startswith("40"):
                entry["kind"] = "socket_connect"
                emit(entry)
                continue

            if raw_s.startswith("41"):
                entry["kind"] = "socket_disconnect"
                emit(entry)
                continue

            if raw_s.startswith("42"):
                entry["kind"] = "event"
                try:
                    arr = json.loads(raw_s[2:])
                    if isinstance(arr, list) and arr:
                        entry["event"] = arr[0]
                        if len(arr) > 1:
                            entry["args"] = arr[1:]
                except Exception:
                    pass
                emit(entry)
                continue

            if raw_s.startswith("4"):
                entry["kind"] = "engine_message"
                emit(entry)
                continue

            emit(entry)
    except KeyboardInterrupt:
        return 130
    except EOFError:
        return 0
    finally:
        try:
            conn.send_close()
        except Exception:
            pass
        conn.close()
        if out_fp:
            out_fp.close()
    return 0


_LEAGUE_NAMES_BY_ID: dict[int, str] = {
    7: "Sage",
    6: "Grand Master",
    5: "Master",
    4: "Gold",
    3: "Silver",
    2: "Bronze",
}

_SQUAD_NAMES_BY_ID: dict[int, str] = {
    0: "Assault",
    1: "Ambush",
    2: "Medic",
    3: "Kage",
    4: "HQ",
}


def _to_number(value: Any) -> int | float | None:
    if value is None:
        return None
    if isinstance(value, bool):
        return None
    if isinstance(value, (int, float)):
        return value
    s = str(value).strip()
    if s == "":
        return None
    try:
        if re.fullmatch(r"[+-]?\d+", s):
            return int(s)
        return float(s)
    except Exception:
        return None


def _row_rate_key(row: dict[str, Any]) -> str:
    cid = row.get("cid")
    if cid is not None:
        s = str(cid).strip()
        if s:
            return "cid:" + s
    name = str(row.get("name") or row.get("player_name") or "").strip()
    if name:
        return "name:" + name.lower()
    return ""


def _get_id_custom_color_map(now_i: int) -> dict[str, str]:
    try:
        last_at = int(_ID_CUSTOM_COLOR_STATE.get("fetched_at") or 0)
    except Exception:
        last_at = 0
    cached = _ID_CUSTOM_COLOR_STATE.get("map")
    cached_map: dict[str, str] = cached if isinstance(cached, dict) else {}
    if cached_map and now_i - last_at < 1800:
        return cached_map

    with _ID_CUSTOM_COLOR_LOCK:
        try:
            last_at2 = int(_ID_CUSTOM_COLOR_STATE.get("fetched_at") or 0)
        except Exception:
            last_at2 = 0
        cached2 = _ID_CUSTOM_COLOR_STATE.get("map")
        cached_map2: dict[str, str] = cached2 if isinstance(cached2, dict) else {}
        if cached_map2 and now_i - last_at2 < 1800:
            return cached_map2

        try:
            data = urlopen("https://ninjasage.id/api/id-custom-color", timeout=10).read().decode("utf-8", "replace")
            j = json.loads(data)
            src = j.get("data") if isinstance(j, dict) else None
            out: dict[str, str] = {}
            if isinstance(src, dict):
                for k, v in src.items():
                    kk = str(k).strip()
                    vv = str(v).strip()
                    if kk and vv:
                        out[kk] = vv
            if out:
                _ID_CUSTOM_COLOR_STATE["map"] = out
                _ID_CUSTOM_COLOR_STATE["fetched_at"] = int(now_i)
                return out
        except Exception:
            pass

        _ID_CUSTOM_COLOR_STATE["fetched_at"] = int(now_i)
        return cached_map2


def _get_id_custom_color_css(now_i: int) -> str:
    try:
        last_at = int(_ID_CUSTOM_COLOR_CSS_STATE.get("fetched_at") or 0)
    except Exception:
        last_at = 0
    cached = _ID_CUSTOM_COLOR_CSS_STATE.get("css")
    cached_css = cached if isinstance(cached, str) else ""
    if cached_css and now_i - last_at < 1800:
        return cached_css

    with _ID_CUSTOM_COLOR_CSS_LOCK:
        try:
            last_at2 = int(_ID_CUSTOM_COLOR_CSS_STATE.get("fetched_at") or 0)
        except Exception:
            last_at2 = 0
        cached2 = _ID_CUSTOM_COLOR_CSS_STATE.get("css")
        cached_css2 = cached2 if isinstance(cached2, str) else ""
        if cached_css2 and now_i - last_at2 < 1800:
            return cached_css2

        try:
            html = fetch_html(
                "https://ninjasage.id/en/leaderboards/shadow-war/realtime",
                cookie=None,
                timeout_s=20,
                no_proxy=False,
            )
            rules = re.findall(r"\.id-\d+\s*\{[^}]*\}", html)
            css = "\n".join(rules).strip()
            if css:
                _ID_CUSTOM_COLOR_CSS_STATE["css"] = css + "\n"
                _ID_CUSTOM_COLOR_CSS_STATE["fetched_at"] = int(now_i)
                return _ID_CUSTOM_COLOR_CSS_STATE["css"]
        except Exception:
            pass

        _ID_CUSTOM_COLOR_CSS_STATE["fetched_at"] = int(now_i)
        return cached_css2


def _enrich_payload_rates(payload: dict[str, Any]) -> dict[str, Any]:
    fetched_at = payload.get("fetched_at")
    try:
        now_i = int(fetched_at) if fetched_at is not None else int(time.time())
    except Exception:
        now_i = int(time.time())

    rows = payload.get("rows")
    if not isinstance(rows, list) or not rows:
        return payload

    keys: list[str] = []
    trophies: list[float | int | None] = []
    for r in rows:
        if not isinstance(r, dict):
            keys.append("")
            trophies.append(None)
            continue
        k = _row_rate_key(r)
        keys.append(k)
        trophies.append(_to_number(r.get("trophy")))

    per_hr_now: dict[str, float] = {}
    with _RATE_LOCK:
        for k, tr in zip(keys, trophies):
            if not k or tr is None:
                continue
            st = _RATE_STATE.get(k)
            if st is None:
                _RATE_STATE[k] = {
                    "last_at": now_i,
                    "last_trophy": tr,
                    "ema_per_hr": None,
                    "mad_ema": None,
                    "last_rate": None,
                    "last_rate_at": None,
                }
                continue

            try:
                last_at = int(st.get("last_at") or 0)
            except Exception:
                last_at = 0
            last_trophy = _to_number(st.get("last_trophy"))
            dt_s = now_i - last_at
            if dt_s >= 1 and last_trophy is not None:
                rate = float(tr - last_trophy) / (float(dt_s) / 3600.0)
                if abs(rate) <= 20000:
                    per_hr_now[k] = rate
                    ema_prev = st.get("ema_per_hr")
                    try:
                        ema_prev_f = float(ema_prev) if ema_prev is not None else None
                    except Exception:
                        ema_prev_f = None
                    alpha = 0.3
                    ema_new = rate if ema_prev_f is None else (ema_prev_f * (1.0 - alpha) + rate * alpha)
                    st["ema_per_hr"] = ema_new
                    try:
                        mad_prev = st.get("mad_ema")
                        mad_prev_f = float(mad_prev) if mad_prev is not None else None
                    except Exception:
                        mad_prev_f = None
                    dev = abs(float(rate) - float(ema_new))
                    beta = 0.25
                    mad_new = dev if mad_prev_f is None else (mad_prev_f * (1.0 - beta) + dev * beta)
                    st["mad_ema"] = mad_new
                    st["last_rate"] = float(rate)
                    st["last_rate_at"] = int(now_i)
            st["last_at"] = now_i
            st["last_trophy"] = tr

        ema_by_key: dict[str, float | None] = {}
        for k in set(keys):
            if not k:
                continue
            st = _RATE_STATE.get(k)
            if not st:
                ema_by_key[k] = None
                continue
            v = st.get("ema_per_hr")
            try:
                ema_by_key[k] = float(v) if v is not None else None
            except Exception:
                ema_by_key[k] = None

    for i, r in enumerate(rows):
        if not isinstance(r, dict):
            continue
        k = keys[i]
        if not k:
            continue
        t_per_hr = per_hr_now.get(k)
        avg_per_hr = ema_by_key.get(k)
        r["tPerHour"] = t_per_hr
        r["avgPerHour"] = avg_per_hr
        r["avgPerDay"] = (avg_per_hr * 24.0) if isinstance(avg_per_hr, (int, float)) else None

    for i, r in enumerate(rows):
        if not isinstance(r, dict):
            continue
        if i <= 0:
            r["overtake"] = None
            continue
        cur_t = _to_number(r.get("trophy"))
        above = rows[i - 1]
        above_t = _to_number(above.get("trophy")) if isinstance(above, dict) else None
        if cur_t is None or above_t is None:
            r["overtake"] = None
            continue
        diff = float(above_t - cur_t)
        if diff <= 0:
            r["overtake"] = 0.0
            continue
        avg = r.get("avgPerHour")
        try:
            avg_f = float(avg) if avg is not None else None
        except Exception:
            avg_f = None
        if avg_f is None or avg_f <= 0:
            r["overtake"] = None
            continue
        hours = diff / avg_f
        if hours < 0 or hours > 1e6:
            r["overtake"] = None
            continue
        r["overtake"] = hours

    color_map = _get_id_custom_color_map(now_i)
    if color_map:
        for r in rows:
            if not isinstance(r, dict):
                continue
            cid = r.get("cid")
            if cid is None:
                continue
            c = color_map.get(str(cid))
            if c:
                r["id_color_name"] = c

    return payload


def _clamp(x: float, lo: float, hi: float) -> float:
    if x < lo:
        return lo
    if x > hi:
        return hi
    return x


def _normal_cdf(z: float) -> float:
    return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0)))


def _predict_row_metrics(row: dict[str, Any], now_i: int) -> dict[str, Any]:
    avg_hr_raw = row.get("avgPerHour")
    t_hr_raw = row.get("tPerHour")
    try:
        avg_hr = float(avg_hr_raw) if avg_hr_raw is not None else (float(t_hr_raw) if t_hr_raw is not None else None)
    except Exception:
        avg_hr = None

    trophy = _to_number(row.get("trophy"))
    gap = _to_number(row.get("gap"))

    dq_fields = [
        _to_number(row.get("rank")) is not None,
        trophy is not None,
        gap is not None,
        avg_hr is not None and avg_hr > 0,
        bool(str(row.get("cid") or row.get("name") or "").strip()),
    ]
    data_quality = float(sum(1 for ok in dq_fields if ok)) / float(len(dq_fields))

    k = _row_rate_key(row)
    st = _RATE_STATE.get(k) if k else None
    ema = None
    mad = None
    last_rate_at = None
    last_rate = None
    if isinstance(st, dict):
        try:
            ema = float(st.get("ema_per_hr")) if st.get("ema_per_hr") is not None else None
        except Exception:
            ema = None
        try:
            mad = float(st.get("mad_ema")) if st.get("mad_ema") is not None else None
        except Exception:
            mad = None
        try:
            last_rate_at = int(st.get("last_rate_at")) if st.get("last_rate_at") is not None else None
        except Exception:
            last_rate_at = None
        try:
            last_rate = float(st.get("last_rate")) if st.get("last_rate") is not None else None
        except Exception:
            last_rate = None

    if ema is None or mad is None or ema == 0:
        consistency = 0.5
    else:
        ratio = mad / (abs(ema) + 1.0)
        consistency = _clamp(1.0 - _clamp(ratio, 0.0, 1.0), 0.0, 1.0)

    if last_rate_at is None:
        idle_risk = 0.5
    else:
        age_s = max(0, int(now_i) - int(last_rate_at))
        if age_s <= 600:
            idle_risk = 0.10
        elif age_s <= 1800:
            idle_risk = 0.30
        elif age_s <= 3600:
            idle_risk = 0.60
        elif age_s <= 7200:
            idle_risk = 0.80
        else:
            idle_risk = 0.90

    if last_rate is None or ema is None:
        spike_risk = 0.25
    else:
        dev = abs(float(last_rate) - float(ema))
        spike_risk = _clamp((dev / (abs(float(ema)) + 50.0)) / 3.0, 0.0, 1.0)

    return {
        "avg_hr": avg_hr,
        "avg_day": (avg_hr * 24.0) if isinstance(avg_hr, (int, float)) else None,
        "gap": gap,
        "consistency": float(consistency),
        "idle_risk": float(idle_risk),
        "spike_risk": float(spike_risk),
        "data_quality": float(data_quality),
    }


def _prob_overtake_leader_before_season_end(
    gap: int | float | None,
    avg_hr: float | None,
    leader_avg_hr: float | None,
    consistency: float,
    idle_risk: float,
    spike_risk: float,
    data_quality: float,
    remaining_s: int | None,
) -> float:
    if remaining_s is None or remaining_s <= 0:
        return 0.05
    rem_h = float(remaining_s) / 3600.0
    if gap is None:
        return 0.05
    if gap <= 0:
        return 0.95
    if avg_hr is None or avg_hr <= 0 or leader_avg_hr is None or leader_avg_hr <= 0:
        return 0.05

    a_eff = float(avg_hr) * (1.0 - 0.55 * _clamp(float(idle_risk), 0.0, 1.0))
    leader_idle_baseline = 0.25
    l_eff = float(leader_avg_hr) * (1.0 - 0.30 * leader_idle_baseline)

    net = (a_eff - l_eff) * rem_h
    sigma = (abs(a_eff) + abs(l_eff)) * rem_h * (
        0.18
        + 0.60 * (1.0 - _clamp(float(consistency), 0.0, 1.0))
        + 0.25 * _clamp(float(spike_risk), 0.0, 1.0)
        + 0.20 * (1.0 - _clamp(float(data_quality), 0.0, 1.0))
    )
    sigma = max(float(sigma), 1.0)
    z = (float(net) - float(gap)) / sigma
    return _clamp(_normal_cdf(float(z)), 0.05, 0.95)


def _prob_overtake_target_before_season_end(
    gap_to_target: int | float | None,
    avg_hr: float | None,
    target_avg_hr: float | None,
    consistency: float,
    idle_risk: float,
    spike_risk: float,
    data_quality: float,
    target_consistency: float,
    target_idle_risk: float,
    target_spike_risk: float,
    target_data_quality: float,
    remaining_s: int | None,
) -> float:
    if remaining_s is None or remaining_s <= 0:
        return 0.05
    rem_h = float(remaining_s) / 3600.0
    if gap_to_target is None:
        return 0.05
    if gap_to_target <= 0:
        return 0.95
    if avg_hr is None or avg_hr <= 0 or target_avg_hr is None or target_avg_hr <= 0:
        return 0.05

    a_eff = float(avg_hr) * (1.0 - 0.55 * _clamp(float(idle_risk), 0.0, 1.0))
    t_eff = float(target_avg_hr) * (1.0 - 0.30 * _clamp(float(target_idle_risk), 0.0, 1.0))

    net = (a_eff - t_eff) * rem_h
    sigma = (abs(a_eff) + abs(t_eff)) * rem_h * (
        0.18
        + 0.35 * (1.0 - _clamp(float(consistency), 0.0, 1.0))
        + 0.35 * (1.0 - _clamp(float(target_consistency), 0.0, 1.0))
        + 0.15 * _clamp(float(spike_risk), 0.0, 1.0)
        + 0.15 * _clamp(float(target_spike_risk), 0.0, 1.0)
        + 0.10 * (1.0 - _clamp(float(data_quality), 0.0, 1.0))
        + 0.10 * (1.0 - _clamp(float(target_data_quality), 0.0, 1.0))
    )
    sigma = max(float(sigma), 1.0)
    z = (float(net) - float(gap_to_target)) / sigma
    return _clamp(_normal_cdf(float(z)), 0.05, 0.95)


def _eta_overtake_target_hours(
    gap_to_target: int | float | None,
    avg_hr: float | None,
    target_avg_hr: float | None,
    idle_risk: float,
    target_idle_risk: float,
) -> float | None:
    if gap_to_target is None or gap_to_target <= 0:
        return 0.0 if gap_to_target is not None and gap_to_target <= 0 else None
    if avg_hr is None or avg_hr <= 0 or target_avg_hr is None or target_avg_hr <= 0:
        return None
    a_eff = float(avg_hr) * (1.0 - 0.55 * _clamp(float(idle_risk), 0.0, 1.0))
    t_eff = float(target_avg_hr) * (1.0 - 0.30 * _clamp(float(target_idle_risk), 0.0, 1.0))
    net_per_h = a_eff - t_eff
    if net_per_h <= 0:
        return None
    h = float(gap_to_target) / float(net_per_h)
    if h < 0 or h > 1e6:
        return None
    return float(h)


def _predict_shadow_war(payload: dict[str, Any]) -> dict[str, Any]:
    rows = payload.get("rows")
    if not isinstance(rows, list) or not rows:
        return {
            "url": payload.get("url"),
            "fetched_at": payload.get("fetched_at"),
            "season_end_at": payload.get("season_end_at"),
            "season_remaining_s": payload.get("season_remaining_s"),
            "count": 0,
            "rows": [],
            "error": payload.get("error") or "missing_rows",
            "model": {"name": "sw_outcome_predictor_v2", "target": "overtake_leader"},
        }

    fetched_at = payload.get("fetched_at")
    try:
        now_i = int(fetched_at) if fetched_at is not None else int(time.time())
    except Exception:
        now_i = int(time.time())

    leader = rows[0] if isinstance(rows[0], dict) else {}
    leader_metrics = _predict_row_metrics(leader, now_i=now_i)
    leader_avg_hr = leader_metrics.get("avg_hr")
    try:
        leader_avg_hr_f = float(leader_avg_hr) if leader_avg_hr is not None else None
    except Exception:
        leader_avg_hr_f = None
    if leader_avg_hr_f is None or leader_avg_hr_f <= 0:
        candidates: list[float] = []
        for rr in rows:
            if not isinstance(rr, dict):
                continue
            v = rr.get("avgPerHour")
            if v is None:
                v = rr.get("tPerHour")
            try:
                vf = float(v) if v is not None else None
            except Exception:
                vf = None
            if vf is not None and vf > 0:
                candidates.append(vf)
        if candidates:
            leader_avg_hr_f = max(candidates) * 0.9

    remaining_s = payload.get("season_remaining_s")
    try:
        remaining_s_i = int(remaining_s) if remaining_s is not None else None
    except Exception:
        remaining_s_i = None

    out_rows: list[dict[str, Any]] = []
    metrics_by_cid: dict[str, dict[str, Any]] = {}
    for r in rows:
        if not isinstance(r, dict):
            continue
        m = _predict_row_metrics(r, now_i=now_i)
        cid_key = str(r.get("cid") or "").strip()
        if cid_key:
            metrics_by_cid[cid_key] = m
        p = _prob_overtake_leader_before_season_end(
            gap=m.get("gap"),
            avg_hr=m.get("avg_hr"),
            leader_avg_hr=leader_avg_hr_f,
            consistency=float(m.get("consistency") or 0.0),
            idle_risk=float(m.get("idle_risk") or 0.0),
            spike_risk=float(m.get("spike_risk") or 0.0),
            data_quality=float(m.get("data_quality") or 0.0),
            remaining_s=remaining_s_i,
        )

        risk = _clamp(0.60 * float(m["idle_risk"]) + 0.40 * float(m["spike_risk"]), 0.0, 1.0)
        confidence = _clamp(
            0.45 * float(m["data_quality"]) + 0.35 * float(m["consistency"]) + 0.20 * (1.0 - float(m["idle_risk"])),
            0.05,
            0.95,
        )

        gap_to_above: int | float | None = None
        above_cid: Any = None
        above_avg_hr: float | None = None
        p_up: float | None = None
        eta_up_h: float | None = None

        rank_v = r.get("rank")
        try:
            rank_i = int(rank_v) if rank_v is not None else None
        except Exception:
            rank_i = None

        if rank_i is not None and rank_i > 1:
            above = rows[rank_i - 2] if rank_i - 2 >= 0 and rank_i - 2 < len(rows) else None
            if isinstance(above, dict):
                above_cid = above.get("cid")
                cur_t = _to_number(r.get("trophy"))
                above_t = _to_number(above.get("trophy"))
                if cur_t is not None and above_t is not None:
                    gap_to_above = float(above_t - cur_t)
                above_cid_key = str(above_cid or "").strip()
                above_m = metrics_by_cid.get(above_cid_key) if above_cid_key else None
                if above_m:
                    try:
                        above_avg_hr = float(above_m.get("avg_hr")) if above_m.get("avg_hr") is not None else None
                    except Exception:
                        above_avg_hr = None

                    p_up = _prob_overtake_target_before_season_end(
                        gap_to_target=gap_to_above,
                        avg_hr=m.get("avg_hr"),
                        target_avg_hr=above_avg_hr,
                        consistency=float(m.get("consistency") or 0.0),
                        idle_risk=float(m.get("idle_risk") or 0.0),
                        spike_risk=float(m.get("spike_risk") or 0.0),
                        data_quality=float(m.get("data_quality") or 0.0),
                        target_consistency=float(above_m.get("consistency") or 0.0),
                        target_idle_risk=float(above_m.get("idle_risk") or 0.0),
                        target_spike_risk=float(above_m.get("spike_risk") or 0.0),
                        target_data_quality=float(above_m.get("data_quality") or 0.0),
                        remaining_s=remaining_s_i,
                    )
                    eta_up_h = _eta_overtake_target_hours(
                        gap_to_target=gap_to_above,
                        avg_hr=m.get("avg_hr"),
                        target_avg_hr=above_avg_hr,
                        idle_risk=float(m.get("idle_risk") or 0.0),
                        target_idle_risk=float(above_m.get("idle_risk") or 0.0),
                    )

        out_rows.append(
            {
                "rank": r.get("rank"),
                "cid": r.get("cid"),
                "name": r.get("name") or r.get("player_name"),
                "trophy": r.get("trophy"),
                "gap": m["gap"],
                "gap_to_above": gap_to_above,
                "avg_hr": m["avg_hr"],
                "avg_day": m["avg_day"],
                "consistency": m["consistency"],
                "idle_risk": m["idle_risk"],
                "spike_risk": m["spike_risk"],
                "data_quality": m["data_quality"],
                "prob_overtake_leader_before_season_end": float(p),
                "prob_overtake_above_before_season_end": float(p_up) if p_up is not None else None,
                "eta_overtake_above_h": float(eta_up_h) if eta_up_h is not None else None,
                "confidence": float(confidence),
                "risk_score": float(risk),
            }
        )

    return {
        "url": payload.get("url"),
        "fetched_at": payload.get("fetched_at"),
        "season_end_at": payload.get("season_end_at"),
        "season_remaining": payload.get("season_remaining"),
        "season_remaining_s": payload.get("season_remaining_s"),
        "season_ended": payload.get("season_ended"),
        "leader": {
            "rank": leader.get("rank"),
            "cid": leader.get("cid"),
            "name": leader.get("name") or leader.get("player_name"),
            "trophy": leader.get("trophy"),
            "avg_hr": leader_avg_hr_f,
        },
        "count": len(out_rows),
        "rows": out_rows,
        "model": {"name": "sw_outcome_predictor_v2", "target": "overtake_leader"},
    }


def _sw_ai_int(v: Any) -> int | None:
    n = _to_number(v)
    if n is None:
        return None
    try:
        return int(n)
    except Exception:
        return None


def _sw_ai_float(v: Any) -> float | None:
    n = _to_number(v)
    if n is None:
        return None
    try:
        return float(n)
    except Exception:
        return None


def _sw_ai_hhmm_duration(minutes: float | int | None) -> str:
    if minutes is None or not isinstance(minutes, (int, float)) or minutes != minutes:
        return "99:59"
    m = int(max(0, round(float(minutes))))
    h = min(99, m // 60)
    mm = min(59, m % 60)
    return f"{h:02d}:{mm:02d}"


def _sw_ai_hhmm_clock(ts_i: int) -> str:
    return datetime.fromtimestamp(int(ts_i), tz=timezone.utc).strftime("%H:%M")


def _sw_ai_default_weights() -> dict[str, float]:
    return {
        "burn_gap_small": 0.35,
        "burn_momentum": 0.20,
        "burn_endgame": 0.20,
        "burn_potential": 0.15,
        "burn_urgency": 0.10,
        "hold_gap_large": 0.30,
        "hold_counter": 0.35,
        "hold_drop": 0.20,
        "hold_quality": 0.15,
        "update_step": 0.06,
    }


def _sw_ai_load_weights(now_i: int) -> dict[str, float]:
    with _SW_AI_LOCK:
        cached = _SW_AI_STATE.get("weights")
        loaded_at = int(_SW_AI_STATE.get("weights_loaded_at") or 0)
        if isinstance(cached, dict) and now_i - loaded_at < 120:
            return dict(cached)

    w = _sw_ai_default_weights()
    if _mysql_enabled():
        try:
            settings = _mysql_settings()
            prefix = settings["prefix"]
            feedback = f"{prefix}scenario_feedback"
            conn = _mysql_open_ready()
            try:
                cur = conn.cursor()
                try:
                    cur.execute(
                        f"SELECT weights_json FROM `{feedback}` WHERE player_key=%s ORDER BY id DESC LIMIT 1",
                        ("__global__",),
                    )
                    row = cur.fetchone()
                    if row and row[0]:
                        data = json.loads(row[0])
                        if isinstance(data, dict):
                            for k, v in data.items():
                                if k in w:
                                    try:
                                        w[k] = float(v)
                                    except Exception:
                                        pass
                finally:
                    try:
                        cur.close()
                    except Exception:
                        pass
                conn.commit()
            finally:
                try:
                    conn.close()
                except Exception:
                    pass
        except Exception:
            pass

    with _SW_AI_LOCK:
        _SW_AI_STATE["weights"] = dict(w)
        _SW_AI_STATE["weights_loaded_at"] = int(now_i)
    return dict(w)


def _sw_ai_adjust_weights(weights: dict[str, float], success: bool) -> dict[str, float]:
    step = float(weights.get("update_step") or 0.06)
    s = step if success else -step
    out = dict(weights)
    for k in [
        "burn_gap_small",
        "burn_momentum",
        "burn_endgame",
        "burn_potential",
        "burn_urgency",
        "hold_gap_large",
        "hold_counter",
        "hold_drop",
        "hold_quality",
    ]:
        v = float(out.get(k) or 0.0)
        out[k] = float(_clamp(v + s, 0.05, 0.90))
    return out


def _sw_ai_find_row(rows: list[Any], player_key: str | None, rank_i: int | None, name_q: str | None) -> dict[str, Any] | None:
    key = (player_key or "").strip()
    name = (name_q or "").strip().lower()
    for r in rows:
        if not isinstance(r, dict):
            continue
        if rank_i is not None:
            try:
                if int(r.get("rank") or 0) == int(rank_i):
                    return r
            except Exception:
                pass
        if key:
            cid = str(r.get("cid") or r.get("player_id") or r.get("id") or "").strip()
            if cid and cid == key:
                return r
        if name:
            rn = str(r.get("name") or r.get("player_name") or "").strip().lower()
            if rn and name in rn:
                return r
    return None


def _sw_ai_burn_speed(avg_hr: float | None, thr_peak: float | None) -> float | None:
    if avg_hr is None or avg_hr <= 0:
        return None
    a = float(avg_hr)
    t = float(thr_peak) if thr_peak is not None and thr_peak > 0 else 0.0
    return float(max(a * 3.5, t))


def _sw_ai_idle_speed(avg_hr: float | None) -> float | None:
    if avg_hr is None or avg_hr <= 0:
        return None
    return float(avg_hr) * 0.1


def _sw_ai_eta_minutes(gap: float | None, speed: float | None) -> float | None:
    if gap is None:
        return None
    if gap <= 0:
        return 0.0
    if speed is None or speed <= 0:
        return None
    h = float(gap) / float(speed)
    if h < 0 or h > 1e6:
        return None
    return float(h) * 60.0


def _sw_ai_momentum(row: dict[str, Any], metrics: dict[str, Any]) -> str:
    m = str(row.get("momentum") or "").strip().lower()
    if m in {"up", "down", "flat"}:
        return m
    k = _row_rate_key(row)
    st = _RATE_STATE.get(k) if k else None
    last_rate = None
    if isinstance(st, dict):
        try:
            last_rate = float(st.get("last_rate")) if st.get("last_rate") is not None else None
        except Exception:
            last_rate = None
    avg_hr = metrics.get("avg_hr")
    try:
        a = float(avg_hr) if avg_hr is not None else None
    except Exception:
        a = None
    if last_rate is None or a is None or a <= 0:
        return "flat"
    if last_rate > max(50.0, 0.20 * a):
        return "up"
    if last_rate < -max(50.0, 0.20 * a):
        return "down"
    return "flat"


def _sw_ai_thr_peak(row: dict[str, Any], avg_hr: float | None) -> float | None:
    for k in ["thr_peak", "thrPeak", "thr_peak_hr", "thrPeakPerHour", "thrPeakHr"]:
        v = _sw_ai_float(row.get(k))
        if v is not None and v > 0:
            return float(v)
    rk = _row_rate_key(row)
    st = _RATE_STATE.get(rk) if rk else None
    last_rate = None
    if isinstance(st, dict):
        try:
            last_rate = float(st.get("last_rate")) if st.get("last_rate") is not None else None
        except Exception:
            last_rate = None
    a = float(avg_hr) if avg_hr is not None and avg_hr > 0 else None
    candidates = [v for v in [last_rate, a * 1.5 if a is not None else None, a * 3.5 if a is not None else None] if v is not None and v > 0]
    return float(max(candidates)) if candidates else None


def _sw_ai_counter_risk(
    gap: float | None,
    remaining_min: int | None,
    opp_metrics: dict[str, Any] | None,
) -> int:
    endgame = 0.0
    if remaining_min is not None:
        endgame = float(_clamp(1.0 - (float(remaining_min) / 90.0), 0.0, 1.0))

    gap_small = 0.4
    if gap is not None:
        gap_small = float(_clamp(1.0 - (float(gap) / 4000.0), 0.0, 1.0))

    opp_signal = 0.30
    if isinstance(opp_metrics, dict):
        try:
            opp_spike = float(opp_metrics.get("spike_risk") or 0.0)
        except Exception:
            opp_spike = 0.25
        opp_signal = float(_clamp(0.20 + 0.80 * opp_spike, 0.0, 1.0))
        if "row" in opp_metrics and isinstance(opp_metrics["row"], dict):
            rk = _row_rate_key(opp_metrics["row"])
            st = _RATE_STATE.get(rk) if rk else None
            if isinstance(st, dict):
                try:
                    lr = float(st.get("last_rate")) if st.get("last_rate") is not None else None
                except Exception:
                    lr = None
                try:
                    ema = float(st.get("ema_per_hr")) if st.get("ema_per_hr") is not None else None
                except Exception:
                    ema = None
                if lr is not None and ema is not None and ema > 0:
                    opp_signal = float(_clamp(max(opp_signal, (lr / (ema + 1.0) - 1.0) / 1.5), 0.0, 1.0))

    p = _clamp(0.55 * opp_signal + 0.25 * gap_small + 0.20 * endgame, 0.0, 1.0)
    return int(round(100.0 * p))


def _sw_ai_fake_burn_risk(opp_metrics: dict[str, Any] | None) -> int:
    if not isinstance(opp_metrics, dict):
        return 25
    try:
        opp_spike = float(opp_metrics.get("spike_risk") or 0.0)
    except Exception:
        opp_spike = 0.25
    try:
        opp_cons = float(opp_metrics.get("consistency") or 0.0)
    except Exception:
        opp_cons = 0.5
    try:
        opp_idle = float(opp_metrics.get("idle_risk") or 0.0)
    except Exception:
        opp_idle = 0.5
    p = _clamp(0.60 * opp_spike + 0.25 * (1.0 - opp_cons) + 0.15 * opp_idle, 0.0, 1.0)
    return int(round(100.0 * p))


def _sw_ai_drop_risk(rows: list[Any], player_row: dict[str, Any], player_metrics: dict[str, Any]) -> int:
    try:
        rank_i = int(player_row.get("rank") or 0)
    except Exception:
        rank_i = 0
    if rank_i <= 0:
        return 35
    below = rows[rank_i] if rank_i < len(rows) else None
    if not isinstance(below, dict):
        return 25
    cur_t = _to_number(player_row.get("trophy"))
    below_t = _to_number(below.get("trophy"))
    if cur_t is None or below_t is None:
        return 35
    gap_to_below = float(cur_t - below_t)
    below_m = _predict_row_metrics(below, now_i=int(time.time()))
    try:
        below_avg = float(below_m.get("avg_hr")) if below_m.get("avg_hr") is not None else None
    except Exception:
        below_avg = None
    try:
        our_avg = float(player_metrics.get("avg_hr")) if player_metrics.get("avg_hr") is not None else None
    except Exception:
        our_avg = None
    if below_avg is None or below_avg <= 0 or our_avg is None or our_avg <= 0:
        return 30
    net = float(below_avg - our_avg)
    if net <= 0:
        base = _clamp(1.0 - (gap_to_below / max(our_avg * 1.0, 800.0)), 0.0, 1.0) * 0.50
        return int(round(100.0 * base))
    eta_min = (gap_to_below / net) * 60.0 if net > 0 else 1e9
    p = _clamp(1.0 - (eta_min / 45.0), 0.0, 1.0)
    return int(round(100.0 * p))


def _sw_ai_timeline(
    now_i: int,
    remaining_min: int,
    trophy_now: float,
    avg_hr: float,
    burn_speed: float,
    opp_trophy_now: float | None,
    opp_avg_hr: float | None,
    burn_min: int,
    idle_min: int,
) -> list[dict[str, Any]]:
    step_min = 5
    horizon_min = max(0, int(remaining_min))
    horizon_min = min(horizon_min, 360)
    points: list[dict[str, Any]] = []

    for t in range(0, horizon_min + 1, step_min):
        ts = int(now_i + t * 60)
        acc = float(avg_hr) * (float(t) / 60.0)
        points.append({"time": _sw_ai_hhmm_clock(ts), "trophy": int(round(trophy_now + acc)), "type": "normal"})

    for t in range(0, horizon_min + 1, step_min):
        ts = int(now_i + t * 60)
        acc = 0.0
        if t > 0:
            for mm in range(0, t, step_min):
                if idle_min > 0 and mm < idle_min:
                    s = float(avg_hr) * 0.1
                elif burn_min > 0 and mm < idle_min + burn_min:
                    s = float(burn_speed)
                else:
                    s = float(avg_hr)
                acc += s * (step_min / 60.0)
        points.append({"time": _sw_ai_hhmm_clock(ts), "trophy": int(round(trophy_now + acc)), "type": "burn"})

    if opp_trophy_now is not None and opp_avg_hr is not None and opp_avg_hr > 0:
        for t in range(0, horizon_min + 1, step_min):
            ts = int(now_i + t * 60)
            points.append(
                {"time": _sw_ai_hhmm_clock(ts), "trophy": int(round(float(opp_trophy_now) + float(opp_avg_hr) * (t / 60.0))), "type": "opponent"}
            )
    return points


def _sw_ai_recent_actual_points(player_key: str, limit: int) -> list[dict[str, Any]]:
    if not _mysql_enabled():
        return []
    try:
        settings = _mysql_settings()
        prefix = settings["prefix"]
        timeline = f"{prefix}timeline_history"
        conn = _mysql_open_ready()
        try:
            cur = conn.cursor()
            try:
                cur.execute(
                    f"SELECT ts, trophy FROM `{timeline}` WHERE player_key=%s AND type=%s ORDER BY ts DESC LIMIT %s",
                    (player_key, "actual", int(limit)),
                )
                rows = cur.fetchall() or []
            finally:
                try:
                    cur.close()
                except Exception:
                    pass
            conn.commit()
        finally:
            try:
                conn.close()
            except Exception:
                pass
        out: list[dict[str, Any]] = []
        for ts, tr in reversed(rows):
            try:
                ts_i = int(ts)
            except Exception:
                continue
            try:
                tr_i = int(tr) if tr is not None else None
            except Exception:
                tr_i = None
            if tr_i is None:
                continue
            out.append({"time": _sw_ai_hhmm_clock(ts_i), "trophy": tr_i, "type": "actual"})
        return out
    except Exception:
        return []


def _sw_ai_mysql_persist(ai_snapshot: dict[str, Any], explain: dict[str, Any], output: dict[str, Any], timeline_points: list[dict[str, Any]]) -> int | None:
    if not _mysql_enabled():
        return None
    try:
        settings = _mysql_settings()
        prefix = settings["prefix"]
        ai_t = f"{prefix}ai_snapshots"
        tl_t = f"{prefix}timeline_history"

        conn = _mysql_open_ready()
        try:
            cur = conn.cursor()
            try:
                cur.execute(
                    f"""INSERT INTO `{ai_t}` (
player_key, player_name, fetched_at, rank, trophy, gap_to_above, avg_hr, thr_peak, momentum, remaining_min,
scenario, command, eta_burn_min, eta_normal_min, success_rate, risk_counter, risk_drop, confidence, reason,
explain_json, output_json
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
player_name=VALUES(player_name),
rank=VALUES(rank),
trophy=VALUES(trophy),
gap_to_above=VALUES(gap_to_above),
avg_hr=VALUES(avg_hr),
thr_peak=VALUES(thr_peak),
momentum=VALUES(momentum),
remaining_min=VALUES(remaining_min),
scenario=VALUES(scenario),
command=VALUES(command),
eta_burn_min=VALUES(eta_burn_min),
eta_normal_min=VALUES(eta_normal_min),
success_rate=VALUES(success_rate),
risk_counter=VALUES(risk_counter),
risk_drop=VALUES(risk_drop),
confidence=VALUES(confidence),
reason=VALUES(reason),
explain_json=VALUES(explain_json),
output_json=VALUES(output_json)""",
                    (
                        _mysql_trunc(ai_snapshot.get("player_key"), 64),
                        _mysql_trunc(ai_snapshot.get("player_name"), 255) if ai_snapshot.get("player_name") is not None else None,
                        int(ai_snapshot.get("fetched_at") or int(time.time())),
                        _mysql_int(ai_snapshot.get("rank")),
                        _mysql_int(ai_snapshot.get("trophy")),
                        _mysql_int(ai_snapshot.get("gap_to_above")),
                        ai_snapshot.get("avg_hr"),
                        ai_snapshot.get("thr_peak"),
                        _mysql_trunc(ai_snapshot.get("momentum"), 8) if ai_snapshot.get("momentum") is not None else None,
                        _mysql_int(ai_snapshot.get("remaining_min")),
                        _mysql_trunc(ai_snapshot.get("scenario"), 32),
                        _mysql_trunc(ai_snapshot.get("command"), 32),
                        _mysql_int(ai_snapshot.get("eta_burn_min")),
                        _mysql_int(ai_snapshot.get("eta_normal_min")),
                        _mysql_int(ai_snapshot.get("success_rate")),
                        _mysql_int(ai_snapshot.get("risk_counter")),
                        _mysql_int(ai_snapshot.get("risk_drop")),
                        _mysql_int(ai_snapshot.get("confidence")),
                        _mysql_trunc(ai_snapshot.get("reason"), 512) if ai_snapshot.get("reason") is not None else None,
                        _dump_json(explain, pretty=False),
                        _dump_json(output, pretty=False),
                    ),
                )
                cur.execute(f"SELECT id FROM `{ai_t}` WHERE player_key=%s AND fetched_at=%s", (ai_snapshot.get("player_key"), int(ai_snapshot.get("fetched_at") or int(time.time()))))
                row = cur.fetchone()
                ai_id = int(row[0]) if row and row[0] is not None else None

                if ai_id is not None:
                    base_dt = datetime.fromtimestamp(int(ai_snapshot.get("fetched_at") or int(time.time())), tz=timezone.utc).replace(second=0, microsecond=0)
                    last_ts = int(base_dt.timestamp()) - 60
                    for p in timeline_points:
                        if not isinstance(p, dict):
                            continue
                        t = p.get("type")
                        if t not in {"actual", "normal", "burn", "opponent"}:
                            continue
                        hhmm = str(p.get("time") or "").strip()
                        if not hhmm:
                            continue
                        trophy = _mysql_int(p.get("trophy"))
                        ts = None
                        try:
                            hh, mm = hhmm.split(":")
                            cand = base_dt.replace(hour=int(hh), minute=int(mm))
                            ts = int(cand.timestamp())
                            if ts < last_ts - 60:
                                ts += 86400
                            last_ts = ts
                        except Exception:
                            ts = None
                        if ts is None:
                            continue
                        cur.execute(
                            f"INSERT INTO `{tl_t}` (player_key, ts, trophy, type, ai_snapshot_id) VALUES (%s,%s,%s,%s,%s)",
                            (
                                _mysql_trunc(ai_snapshot.get("player_key"), 64),
                                int(ts),
                                trophy,
                                _mysql_trunc(t, 16),
                                int(ai_id),
                            ),
                        )
            finally:
                try:
                    cur.close()
                except Exception:
                    pass
            conn.commit()
        finally:
            try:
                conn.close()
            except Exception:
                pass
        return ai_id
    except Exception:
        return None


def _sw_ai_http_post_json(url: str, payload: Any, timeout_s: int) -> tuple[int, str]:
    raw = _dump_json(payload, pretty=False).encode("utf-8")
    req = Request(url, data=raw, headers={"Content-Type": "application/json", "User-Agent": "analizer-king-sw-ai"}, method="POST")
    try:
        with urlopen(req, timeout=timeout_s) as resp:
            data = resp.read()
            charset = resp.headers.get_content_charset() or "utf-8"
            return int(getattr(resp, "status", 200) or 200), data.decode(charset, errors="replace")
    except Exception as e:
        return 0, str(e)


def _sw_ai_send_alert(player_key: str, event: str, message: str) -> None:
    msg = str(message or "").strip()
    if not msg:
        return
    token = (os.getenv("TELEGRAM_BOT_TOKEN") or "").strip()
    chat_id = (os.getenv("TELEGRAM_CHAT_ID") or "").strip()
    discord_webhook = (os.getenv("DISCORD_WEBHOOK_URL") or "").strip()
    if not token and not discord_webhook:
        return

    cooldown_s = _sw_ai_int(os.getenv("ALERT_COOLDOWN_S")) or 120
    now_i = int(time.time())
    st_key = hashlib.sha1((event + "|" + msg).encode("utf-8", "replace")).hexdigest()
    with _SW_ALERT_LOCK:
        prev = _SW_ALERT_STATE["last_by_player"].get(player_key)
        if isinstance(prev, dict):
            if prev.get("hash") == st_key and now_i - int(prev.get("at") or 0) < cooldown_s:
                return
        _SW_ALERT_STATE["last_by_player"][player_key] = {"hash": st_key, "at": now_i}

    if token and chat_id:
        url = f"https://api.telegram.org/bot{token}/sendMessage"
        payload = {"chat_id": chat_id, "text": msg, "disable_web_page_preview": True}
        _sw_ai_http_post_json(url, payload, timeout_s=10)

    if discord_webhook:
        payload = {"content": msg}
        _sw_ai_http_post_json(discord_webhook, payload, timeout_s=10)

    if _mysql_enabled():
        try:
            settings = _mysql_settings()
            prefix = settings["prefix"]
            alerts = f"{prefix}alert_history"
            conn = _mysql_open_ready()
            try:
                cur = conn.cursor()
                try:
                    if token and chat_id:
                        cur.execute(
                            f"INSERT INTO `{alerts}` (player_key, ts, channel, event, state_hash, message, sent, error) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
                            (_mysql_trunc(player_key, 64), now_i, "telegram", _mysql_trunc(event, 32), st_key, msg, 1, None),
                        )
                    if discord_webhook:
                        cur.execute(
                            f"INSERT INTO `{alerts}` (player_key, ts, channel, event, state_hash, message, sent, error) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
                            (_mysql_trunc(player_key, 64), now_i, "discord", _mysql_trunc(event, 32), st_key, msg, 1, None),
                        )
                finally:
                    try:
                        cur.close()
                    except Exception:
                        pass
                conn.commit()
            finally:
                try:
                    conn.close()
                except Exception:
                    pass
        except Exception:
            pass


def sw_ai_command_system(payload: dict[str, Any], player_key: str | None, rank_i: int | None, name_q: str | None, burn_min: int, idle_min: int) -> tuple[dict[str, Any], dict[str, Any]]:
    rows = payload.get("rows")
    if not isinstance(rows, list) or not rows:
        out = {
            "scenario": "IDLE",
            "command": "ABORT",
            "eta": {"burn": "99:59", "normal": "99:59"},
            "success_rate": 40,
            "risk": {"counter_burn": 50, "drop_rank": 40},
            "confidence": 15,
            "timeline": [],
        }
        explain = {"error": payload.get("error") or "missing_rows"}
        return out, explain

    fetched_at = payload.get("fetched_at")
    try:
        now_i = int(fetched_at) if fetched_at is not None else int(time.time())
    except Exception:
        now_i = int(time.time())

    remaining_min = None
    rem_s = payload.get("season_remaining_s")
    if rem_s is not None:
        try:
            remaining_min = max(0, int(int(rem_s) // 60))
        except Exception:
            remaining_min = None

    player_row = _sw_ai_find_row(rows, player_key=player_key, rank_i=rank_i, name_q=name_q)
    if not player_row:
        out = {
            "scenario": "IDLE",
            "command": "ABORT",
            "eta": {"burn": "99:59", "normal": "99:59"},
            "success_rate": 40,
            "risk": {"counter_burn": 50, "drop_rank": 40},
            "confidence": 15,
            "timeline": [],
        }
        explain = {"error": "player_not_found", "query": {"player_key": player_key, "rank": rank_i, "name": name_q}}
        return out, explain

    player_metrics = _predict_row_metrics(player_row, now_i=now_i)
    try:
        avg_hr = float(player_metrics.get("avg_hr")) if player_metrics.get("avg_hr") is not None else None
    except Exception:
        avg_hr = None
    thr_peak = _sw_ai_thr_peak(player_row, avg_hr)
    burn_speed = _sw_ai_burn_speed(avg_hr, thr_peak)

    rank_v = player_row.get("rank")
    try:
        pr = int(rank_v) if rank_v is not None else None
    except Exception:
        pr = None

    above_row = None
    if pr is not None and pr > 1 and pr - 2 >= 0:
        rr = rows[pr - 2]
        if isinstance(rr, dict):
            above_row = rr

    below_row = None
    if pr is not None and pr > 0 and pr < len(rows):
        rr = rows[pr]
        if isinstance(rr, dict):
            below_row = rr

    trophy = _to_number(player_row.get("trophy"))
    trophy_now = float(trophy) if trophy is not None else None
    gap_to_above = None
    opp_trophy_now = None
    opp_avg_hr = None
    opp_metrics = None
    if isinstance(above_row, dict):
        above_t = _to_number(above_row.get("trophy"))
        if trophy is not None and above_t is not None:
            gap_to_above = float(above_t - trophy)
            opp_trophy_now = float(above_t)
        opp_metrics = _predict_row_metrics(above_row, now_i=now_i)
        opp_metrics["row"] = above_row
        try:
            opp_avg_hr = float(opp_metrics.get("avg_hr")) if opp_metrics.get("avg_hr") is not None else None
        except Exception:
            opp_avg_hr = None

    momentum = _sw_ai_momentum(player_row, player_metrics)

    eta_normal_min = _sw_ai_eta_minutes(gap_to_above, avg_hr)
    eta_burn_min = _sw_ai_eta_minutes(gap_to_above, burn_speed)

    counter_risk = _sw_ai_counter_risk(gap_to_above, remaining_min, opp_metrics)
    fake_burn_risk = _sw_ai_fake_burn_risk(opp_metrics)
    drop_risk = _sw_ai_drop_risk(rows, player_row, player_metrics)

    weights = _sw_ai_load_weights(now_i)
    if trophy_now is None or avg_hr is None or avg_hr <= 0 or gap_to_above is None or gap_to_above < 0:
        burn_score = 0.15
        hold_score = 0.65
        quality = float(player_metrics.get("data_quality") or 0.0)
    else:
        scale_gap = max(700.0, float(avg_hr) * 0.75)
        gap_small = float(_clamp(1.0 - (float(gap_to_above) / scale_gap), 0.0, 1.0))
        gap_large = 1.0 - gap_small
        mom = 1.0 if momentum == "up" else (0.5 if momentum == "flat" else 0.0)
        endgame = 0.0
        if remaining_min is not None:
            endgame = float(_clamp(1.0 - (float(remaining_min) / 75.0), 0.0, 1.0))
        potential = 0.0
        if burn_speed is not None and avg_hr is not None and avg_hr > 0:
            potential = float(_clamp(((burn_speed / avg_hr) - 1.0) / 2.5, 0.0, 1.0))
        urgency = 0.0
        if eta_burn_min is not None and remaining_min is not None and remaining_min > 0:
            urgency = float(_clamp(1.0 - (float(eta_burn_min) / max(15.0, float(remaining_min))), 0.0, 1.0))
        quality = float(player_metrics.get("data_quality") or 0.0)

        burn_score = (
            float(weights["burn_gap_small"]) * gap_small
            + float(weights["burn_momentum"]) * mom
            + float(weights["burn_endgame"]) * endgame
            + float(weights["burn_potential"]) * potential
            + float(weights["burn_urgency"]) * urgency
        )
        hold_score = (
            float(weights["hold_gap_large"]) * gap_large
            + float(weights["hold_counter"]) * (float(counter_risk) / 100.0)
            + float(weights["hold_drop"]) * (float(drop_risk) / 100.0)
            + float(weights["hold_quality"]) * (1.0 - _clamp(quality, 0.0, 1.0))
        )
        burn_score = float(_clamp(burn_score, 0.0, 1.0))
        hold_score = float(_clamp(hold_score, 0.0, 1.0))

    scenario = "NORMAL"
    command = "HOLD"
    reason = ""

    if pr is not None and pr <= 1:
        scenario = "SECURE_RANK" if (remaining_min is not None and remaining_min <= 60) else "NORMAL"
        command = "SECURE" if scenario == "SECURE_RANK" else "HOLD"
        reason = "already_rank_1"
    else:
        endgame_hard = bool(remaining_min is not None and remaining_min <= 60)
        eta_burn_soon = bool(eta_burn_min is not None and eta_burn_min <= 15.0)
        if endgame_hard and burn_score >= 0.45:
            scenario = "LAST_HOUR_PUSH"
            command = "BURN NOW"
            reason = "last_hour_push"
        elif eta_burn_soon and burn_score >= 0.55 and counter_risk < 70:
            scenario = "BURN"
            command = "BURN NOW"
            reason = "eta_under_15min"
        elif burn_score >= 0.72 and counter_risk < 60:
            scenario = "BURN"
            command = "BURN NOW"
            reason = "high_burn_score_low_counter"
        elif burn_score >= 0.60 and counter_risk >= 60:
            scenario = "REACTIVE_BURN"
            command = "DELAY BURN"
            reason = "counter_risk_high"
        elif counter_risk >= 75 and burn_score >= 0.50:
            scenario = "COUNTER_BURN"
            command = "BURN NOW"
            reason = "counter_burn_expected"
        elif hold_score >= 0.68 and burn_score <= 0.40:
            scenario = "IDLE"
            command = "HOLD"
            reason = "hold_dominant"
        else:
            scenario = "NORMAL"
            command = "DELAY BURN" if burn_score >= 0.55 else "HOLD"
            reason = "balanced"

    if command == "BURN NOW":
        base_sr = int(round(100.0 * burn_score))
    elif command == "SECURE":
        base_sr = int(round(100.0 * (1.0 - (float(drop_risk) / 120.0))))
    else:
        base_sr = int(round(100.0 * hold_score))

    success_rate = int(_clamp(float(base_sr) - 0.35 * float(counter_risk) - 0.20 * float(drop_risk), 5.0, 95.0))

    confidence = int(
        round(
            100.0
            * _clamp(
                0.40 * float(_clamp(float(quality), 0.0, 1.0))
                + 0.35 * float(abs(float(burn_score) - float(hold_score)))
                + 0.25 * (1.0 - float(counter_risk) / 100.0),
                0.05,
                0.95,
            )
        )
    )

    if remaining_min is None:
        remaining_min = 0

    if trophy_now is None:
        trophy_now = 0.0
    if avg_hr is None or avg_hr <= 0:
        avg_hr = 1.0
    if burn_speed is None or burn_speed <= 0:
        burn_speed = float(avg_hr) * 3.5

    player_key_norm = str(player_row.get("cid") or player_row.get("player_id") or player_row.get("id") or player_key or "").strip() or f"rank:{pr or 0}"
    actual_points = _sw_ai_recent_actual_points(player_key_norm, limit=24)
    timeline_points = []
    if actual_points:
        timeline_points.extend(actual_points)
    timeline_points.append({"time": _sw_ai_hhmm_clock(now_i), "trophy": int(round(trophy_now)), "type": "actual"})
    timeline_points.extend(
        _sw_ai_timeline(
            now_i=now_i,
            remaining_min=int(remaining_min),
            trophy_now=float(trophy_now),
            avg_hr=float(avg_hr),
            burn_speed=float(burn_speed),
            opp_trophy_now=opp_trophy_now,
            opp_avg_hr=opp_avg_hr,
            burn_min=int(max(0, burn_min)),
            idle_min=int(max(0, idle_min)),
        )
    )

    eta = {
        "burn": _sw_ai_hhmm_duration(eta_burn_min),
        "normal": _sw_ai_hhmm_duration(eta_normal_min),
    }

    output = {
        "scenario": str(scenario),
        "command": str(command),
        "eta": eta,
        "success_rate": int(_clamp(float(success_rate), 0.0, 100.0)),
        "risk": {"counter_burn": int(_clamp(float(counter_risk), 0.0, 100.0)), "drop_rank": int(_clamp(float(drop_risk), 0.0, 100.0))},
        "confidence": int(_clamp(float(confidence), 0.0, 100.0)),
        "timeline": timeline_points,
    }

    explain = {
        "player_key": player_key_norm,
        "player_name": player_row.get("name") or player_row.get("player_name"),
        "rank": pr,
        "trophy": int(round(trophy_now)),
        "gap_to_above": gap_to_above,
        "avg_hr": avg_hr,
        "thr_peak": thr_peak,
        "momentum": momentum,
        "remaining_min": remaining_min,
        "scores": {"burn": burn_score, "hold": hold_score},
        "risk": {"counter_burn": counter_risk, "drop_rank": drop_risk, "fake_burn": fake_burn_risk},
        "reason": reason,
    }

    ai_snapshot = {
        "player_key": player_key_norm,
        "player_name": player_row.get("name") or player_row.get("player_name"),
        "fetched_at": now_i,
        "rank": pr,
        "trophy": int(round(trophy_now)),
        "gap_to_above": int(round(gap_to_above)) if gap_to_above is not None else None,
        "avg_hr": float(avg_hr),
        "thr_peak": float(thr_peak) if thr_peak is not None else None,
        "momentum": momentum,
        "remaining_min": int(remaining_min),
        "scenario": scenario,
        "command": command,
        "eta_burn_min": int(round(eta_burn_min)) if eta_burn_min is not None else None,
        "eta_normal_min": int(round(eta_normal_min)) if eta_normal_min is not None else None,
        "success_rate": int(success_rate),
        "risk_counter": int(counter_risk),
        "risk_drop": int(drop_risk),
        "confidence": int(confidence),
        "reason": reason,
    }

    now_hhmm = _sw_ai_hhmm_clock(now_i)
    persist_points: list[dict[str, Any]] = [{"time": now_hhmm, "trophy": int(round(trophy_now)), "type": "actual"}]
    for p in timeline_points:
        if not isinstance(p, dict):
            continue
        if p.get("type") in {"normal", "burn", "opponent"}:
            persist_points.append(p)
    _sw_ai_mysql_persist(ai_snapshot, explain=explain, output=output, timeline_points=persist_points[:80])

    with _SW_AI_LOCK:
        ps = _SW_AI_STATE["player_state"].get(player_key_norm)
        prev = dict(ps) if isinstance(ps, dict) else None
        _SW_AI_STATE["player_state"][player_key_norm] = {
            "at": now_i,
            "rank": pr,
            "gap_to_above": gap_to_above,
            "scenario": scenario,
            "command": command,
            "success_rate": success_rate,
        }

    if prev and isinstance(prev, dict):
        prev_rank = prev.get("rank")
        prev_gap = prev.get("gap_to_above")
        delta_rank = None
        if prev_rank is not None and pr is not None:
            try:
                delta_rank = int(prev_rank) - int(pr)
            except Exception:
                delta_rank = None
        delta_gap = None
        if prev_gap is not None and gap_to_above is not None:
            try:
                delta_gap = float(prev_gap) - float(gap_to_above)
            except Exception:
                delta_gap = None
        success = False
        if prev.get("command") == "BURN NOW":
            success = bool(delta_rank is not None and delta_rank > 0) or bool(delta_gap is not None and delta_gap > 0)
        elif prev.get("command") in {"HOLD", "SECURE", "DELAY BURN"}:
            success = bool(delta_rank is not None and delta_rank >= 0)
        else:
            success = False

        if _mysql_enabled():
            try:
                settings = _mysql_settings()
                prefix = settings["prefix"]
                feedback_t = f"{prefix}scenario_feedback"
                conn = _mysql_open_ready()
                try:
                    cur = conn.cursor()
                    try:
                        new_w = _sw_ai_adjust_weights(_sw_ai_load_weights(now_i), success=success)
                        cur.execute(
                            f"INSERT INTO `{feedback_t}` (player_key, ts, scenario, command, success, delta_rank, delta_gap, weights_json) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
                            (
                                "__global__",
                                now_i,
                                _mysql_trunc(str(prev.get("scenario") or "NORMAL"), 32),
                                _mysql_trunc(str(prev.get("command") or "HOLD"), 32),
                                1 if success else 0,
                                _mysql_int(delta_rank),
                                _mysql_int(delta_gap),
                                _dump_json(new_w, pretty=False),
                            ),
                        )
                        conn.commit()
                    finally:
                        try:
                            cur.close()
                        except Exception:
                            pass
                finally:
                    try:
                        conn.close()
                    except Exception:
                        pass
            except Exception:
                pass

    event = None
    if prev and isinstance(prev, dict) and prev.get("command") != command:
        event = "command_change"
    if eta_burn_min is not None and eta_burn_min <= 15:
        event = event or "eta_under_15m"
    if counter_risk >= 60:
        event = event or "counter_risk_high"
    if prev and isinstance(prev, dict) and prev.get("rank") is not None and pr is not None and int(prev.get("rank")) != int(pr):
        event = event or "rank_change"

    if event:
        risk_level = max(counter_risk, drop_risk)
        pname = str(player_row.get("name") or player_row.get("player_name") or player_key_norm)
        msg = (
            f"Shadow War AI | {event}\n"
            f"Player: {pname}\n"
            f"Rank: {pr if pr is not None else '-'}\n"
            f"Command: {command} ({scenario})\n"
            f"ETA burn: {eta['burn']} | normal: {eta['normal']}\n"
            f"Risk: {risk_level}% (counter {counter_risk} / drop {drop_risk})\n"
            f"Reason: {reason}"
        )
        _sw_ai_send_alert(player_key_norm, event=event, message=msg)

    return output, explain


def _sw_ai_process_tracked(payload: dict[str, Any]) -> None:
    spec = (os.getenv("AI_TRACK_PLAYERS") or "").strip()
    if not spec:
        return
    burn_min = _sw_ai_int(os.getenv("AI_TRACK_BURN_MIN")) or 0
    idle_min = _sw_ai_int(os.getenv("AI_TRACK_IDLE_MIN")) or 0
    for raw in [x.strip() for x in spec.split(",")]:
        if not raw:
            continue
        key = None
        rank_i = None
        name_q = None
        rl = raw.lower()
        if rl.startswith("rank:"):
            try:
                rank_i = int(raw.split(":", 1)[1].strip())
            except Exception:
                rank_i = None
        elif rl.startswith("name:"):
            name_q = raw.split(":", 1)[1].strip()
        elif raw.isdigit():
            try:
                rank_i = int(raw)
            except Exception:
                rank_i = None
        else:
            key = raw
        try:
            sw_ai_command_system(payload, player_key=key, rank_i=rank_i, name_q=name_q, burn_min=int(burn_min), idle_min=int(idle_min))
        except Exception:
            pass


def _league_name_from_id(value: Any) -> str:
    n = _to_number(value)
    if isinstance(n, (int, float)) and float(n).is_integer():
        k = int(n)
        if k in _LEAGUE_NAMES_BY_ID:
            return _LEAGUE_NAMES_BY_ID[k]
    x = str(value or "").strip()
    if x == "":
        return ""
    lx = x.lower()
    if "sage" in lx:
        return _LEAGUE_NAMES_BY_ID[7]
    if "grand" in lx:
        return _LEAGUE_NAMES_BY_ID[6]
    if "master" in lx:
        return _LEAGUE_NAMES_BY_ID[5]
    if "gold" in lx:
        return _LEAGUE_NAMES_BY_ID[4]
    if "silver" in lx:
        return _LEAGUE_NAMES_BY_ID[3]
    if "bronze" in lx:
        return _LEAGUE_NAMES_BY_ID[2]
    return x


def _squad_name_from_id(value: Any) -> str:
    n = _to_number(value)
    if isinstance(n, (int, float)) and float(n).is_integer():
        k = int(n)
        if k in _SQUAD_NAMES_BY_ID:
            return _SQUAD_NAMES_BY_ID[k]
    return ""


def _socketio_payload_to_ui(payload: Any, url: str, limit: int) -> dict[str, Any]:
    fetched_at = int(time.time())
    participants = payload.get("participants") if isinstance(payload, dict) else None
    squads_src = payload.get("squads") if isinstance(payload, dict) else None
    participants = participants if isinstance(participants, list) else []
    squads_src = squads_src if isinstance(squads_src, list) else []

    merged_by_id: dict[int | str, dict[str, Any]] = {}
    for s in squads_src:
        if not isinstance(s, dict):
            continue
        sp = s.get("participants")
        if not isinstance(sp, list):
            continue
        for p in sp:
            if not isinstance(p, dict):
                continue
            pid = p.get("id")
            if pid is None:
                continue
            pid_n = _to_number(pid)
            key: int | str = int(pid_n) if isinstance(pid_n, (int, float)) and float(pid_n).is_integer() else str(pid)
            prev = merged_by_id.get(key)
            if prev is None:
                merged_by_id[key] = p
                continue
            t_prev = _to_number(prev.get("trophy"))
            t_now = _to_number(p.get("trophy"))
            if t_prev is None or (t_now is not None and t_now > t_prev):
                merged_by_id[key] = p

    if len(merged_by_id) > len(participants):
        participants = list(merged_by_id.values())
        participants.sort(
            key=lambda x: (
                _to_number(x.get("trophy")) is None,
                -float(_to_number(x.get("trophy")) or 0),
                str(x.get("id") or ""),
            )
        )

    top_trophy: int | float | None = None
    for p in participants:
        if not isinstance(p, dict):
            continue
        v = _to_number(p.get("trophy"))
        if v is None:
            continue
        top_trophy = v if top_trophy is None else max(top_trophy, v)

    rows_all: list[dict[str, Any]] = []
    for i, p in enumerate(participants):
        if not isinstance(p, dict):
            continue
        trophy = _to_number(p.get("trophy"))
        gap = (top_trophy - trophy) if top_trophy is not None and trophy is not None else None
        rows_all.append(
            {
                "rank": i + 1,
                "name": p.get("name") or "",
                "cid": p.get("id"),
                "level": p.get("level"),
                "squad": _squad_name_from_id(p.get("squad")),
                "league": _league_name_from_id(p.get("league")),
                "trophy": trophy,
                "gap": gap,
            }
        )

    rows = rows_all
    if limit and limit > 0:
        rows = rows_all[: max(0, int(limit))]

    squads: list[dict[str, Any]] = []
    for s in squads_src:
        if not isinstance(s, dict):
            continue
        score = _to_number(s.get("trophy"))
        name = _squad_name_from_id(s.get("squad"))
        if not name:
            continue
        squads.append({"name": name, "score": score, "gap": None})

    squads.sort(key=lambda x: (x.get("score") is None, -(x.get("score") or 0)))
    top_score = squads[0].get("score") if squads else None
    if top_score is not None:
        for s in squads:
            sc = s.get("score")
            s["gap"] = (top_score - sc) if sc is not None else None

    return {
        "url": url,
        "fetched_at": fetched_at,
        "season_end_at": None,
        "season_remaining": None,
        "season_remaining_s": None,
        "season_ended": None,
        "squads": squads,
        "count": len(rows),
        "rows": rows,
    }


def scrape_shadow_war_socketio(
    url: str,
    ws_url: str,
    origin: str | None,
    timeout_s: int,
    limit: int,
    channel: str,
    event_name: str,
) -> dict[str, Any]:
    conn = _ws_connect(ws_url, origin=origin, timeout_s=timeout_s)
    started = time.monotonic()
    connected = False
    subscribed = False

    try:
        while True:
            if time.monotonic() - started > max(2, int(timeout_s)):
                raise TimeoutError("socket.io timeout waiting for leaderboard event")

            try:
                msg = conn.recv_message(timeout_s=max(1, min(5, int(timeout_s))))
            except (TimeoutError, socket.timeout):
                continue
            if not isinstance(msg, str):
                continue

            raw_s = msg.strip()
            if raw_s == "":
                continue

            if raw_s.startswith("0"):
                conn.send_text("40")
                continue

            if raw_s.startswith("2"):
                conn.send_text("3" + raw_s[1:])
                continue

            if raw_s.startswith("40"):
                connected = True
                if connected and not subscribed:
                    ch = channel.strip() or "sw.leaderboard"
                    sub: dict[str, Any] = {"channel": ch, "auth": {"headers": {}}}
                    if limit and limit > 0:
                        sub["limit"] = int(limit)
                    conn.send_text('42["subscribe",' + json.dumps(sub, ensure_ascii=False, separators=(",", ":")) + "]")
                    subscribed = True
                continue

            if not raw_s.startswith("42"):
                continue

            try:
                arr = json.loads(raw_s[2:])
            except Exception:
                continue
            if not isinstance(arr, list) or not arr:
                continue
            if arr[0] != event_name:
                continue

            args = arr[1:]
            msg_channel: str | None = None
            payload: Any = None
            if len(args) >= 2 and isinstance(args[0], str) and isinstance(args[1], (dict, list)):
                msg_channel = args[0]
                payload = args[1]
            elif len(args) >= 1 and isinstance(args[0], (dict, list)):
                payload = args[0]
            else:
                continue

            want = channel.strip() or "sw.leaderboard"
            if msg_channel and msg_channel.strip() and msg_channel.strip() != want:
                continue

            return _socketio_payload_to_ui(payload, url=url, limit=limit)
    finally:
        try:
            conn.send_close()
        except Exception:
            pass
        conn.close()


def _to_int(value: str | None) -> int | None:
    s = (value or "").strip()
    s = re.sub(r"[^0-9]", "", s)
    return int(s) if s else None


def _norm_header(s: str) -> str:
    return re.sub(r"[^a-z0-9]+", "", (s or "").strip().lower())


def _extract_cid(name: str) -> int | None:
    m = re.search(r"\[(\d+)\]\s*$", (name or "").strip())
    return int(m.group(1)) if m else None


@dataclass
class _Table:
    rows: list[list[str]] = field(default_factory=list)


class _HTMLTables(HTMLParser):
    def __init__(self) -> None:
        super().__init__(convert_charrefs=True)
        self.tables: list[_Table] = []
        self._in_table = False
        self._in_row = False
        self._in_cell = False
        self._cell_tag: str | None = None
        self._cell_text_parts: list[str] = []
        self._current_row: list[str] = []

    def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
        if tag == "table":
            self._in_table = True
            self.tables.append(_Table())
            return

        if not self._in_table:
            return

        if tag == "tr":
            self._in_row = True
            self._current_row = []
            return

        if self._in_row and tag in ("td", "th"):
            self._in_cell = True
            self._cell_tag = tag
            self._cell_text_parts = []
            return

    def handle_endtag(self, tag: str) -> None:
        if tag == "table" and self._in_table:
            self._in_table = False
            self._in_row = False
            self._in_cell = False
            self._cell_tag = None
            self._cell_text_parts = []
            self._current_row = []
            return

        if not self._in_table:
            return

        if tag in ("td", "th") and self._in_cell and self._cell_tag == tag:
            self._in_cell = False
            text = " ".join("".join(self._cell_text_parts).split())
            self._current_row.append(text)
            self._cell_tag = None
            self._cell_text_parts = []
            return

        if tag == "tr" and self._in_row:
            self._in_row = False
            if self.tables and any(c.strip() for c in self._current_row):
                self.tables[-1].rows.append(self._current_row)
            self._current_row = []
            return

    def handle_data(self, data: str) -> None:
        if self._in_table and self._in_cell and data:
            self._cell_text_parts.append(data)


def _pick_best_table(tables: list[_Table]) -> _Table | None:
    candidates: list[tuple[int, int, int]] = []
    for i, t in enumerate(tables):
        row_count = len(t.rows)
        max_cols = max((len(r) for r in t.rows), default=0)
        score = row_count * 1000 + max_cols
        if row_count >= 2 and max_cols >= 5:
            candidates.append((score, i, max_cols))
    if not candidates:
        return None
    _, idx, _ = max(candidates)
    return tables[idx]


def _infer_header_and_body(rows: list[list[str]]) -> tuple[list[str] | None, list[list[str]]]:
    if not rows:
        return None, []
    first = rows[0]
    headerish = 0
    for c in first:
        n = _norm_header(c)
        if n in {"rank", "player", "name", "level", "lvl", "squad", "league", "trophy", "gap"}:
            headerish += 1
    if headerish >= 2:
        return first, rows[1:]
    return None, rows


def _header_map(headers: list[str]) -> dict[int, str]:
    mapping: dict[int, str] = {}
    for i, h in enumerate(headers):
        hs = (h or "").strip()
        n = _norm_header(hs)
        if hs == "#" or n in {"no", "rank"}:
            mapping[i] = "rank"
        elif n in {"player", "name", "username"}:
            mapping[i] = "name"
        elif n in {"lvl", "level"}:
            mapping[i] = "level"
        elif n in {"squad", "clan"}:
            mapping[i] = "squad"
        elif n in {"league"}:
            mapping[i] = "league"
        elif n in {"trophy", "trophies"}:
            mapping[i] = "trophy"
        elif n in {"gap"}:
            mapping[i] = "gap"
    return mapping


def _row_to_item(cells: list[str], mapping: dict[int, str] | None) -> dict[str, Any] | None:
    if not cells or not any(c.strip() for c in cells):
        return None

    if mapping:
        item: dict[str, Any] = {}
        for i, key in mapping.items():
            if i >= len(cells):
                continue
            val = cells[i]
            if key in {"rank", "level", "trophy", "gap"}:
                item[key] = _to_int(val)
            else:
                item[key] = val
        if not item:
            return None
        name = (item.get("name") or "").strip()
        cid = _extract_cid(name)
        if cid is not None:
            item["cid"] = cid
        if (
            name in {"", "[]"}
            and item.get("rank") is None
            and item.get("level") is None
            and item.get("trophy") is None
            and item.get("gap") is None
            and not (item.get("squad") or "").strip()
            and not (item.get("league") or "").strip()
        ):
            return None
        return item

    if len(cells) < 5:
        return None
    item = {
        "rank": _to_int(cells[0]) if len(cells) > 0 else None,
        "name": cells[1] if len(cells) > 1 else "",
        "level": _to_int(cells[2]) if len(cells) > 2 else None,
        "squad": cells[3] if len(cells) > 3 else "",
        "league": cells[4] if len(cells) > 4 else "",
        "trophy": _to_int(cells[5]) if len(cells) > 5 else None,
        "gap": _to_int(cells[6]) if len(cells) > 6 else None,
        "raw": cells,
    }
    name = (item.get("name") or "").strip()
    cid = _extract_cid(name)
    if cid is not None:
        item["cid"] = cid
    if (
        name in {"", "[]"}
        and item.get("rank") is None
        and item.get("level") is None
        and item.get("trophy") is None
        and item.get("gap") is None
        and not (item.get("squad") or "").strip()
        and not (item.get("league") or "").strip()
    ):
        return None
    return item


def fetch_html(url: str, cookie: str | None, timeout_s: int, no_proxy: bool) -> str:
    headers = {
        "User-Agent": "Mozilla/5.0",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.8,id;q=0.7",
        "Connection": "close",
    }
    if cookie:
        headers["Cookie"] = cookie
    req = Request(url, headers=headers, method="GET")
    if no_proxy:
        opener = build_opener(ProxyHandler({}))
        resp_ctx = opener.open(req, timeout=timeout_s)
    else:
        resp_ctx = urlopen(req, timeout=timeout_s)

    with resp_ctx as resp:
        charset = resp.headers.get_content_charset() or "utf-8"
        return resp.read().decode(charset, errors="replace")


def fetch_json(url: str, cookie: str | None, timeout_s: int, no_proxy: bool) -> tuple[int, Any]:
    headers = {
        "User-Agent": "Mozilla/5.0",
        "Accept": "application/json",
        "Accept-Language": "en-US,en;q=0.8,id;q=0.7",
        "Connection": "close",
    }
    if cookie:
        headers["Cookie"] = cookie
    req = Request(url, headers=headers, method="GET")
    try:
        if no_proxy:
            opener = build_opener(ProxyHandler({}))
            resp_ctx = opener.open(req, timeout=timeout_s)
        else:
            resp_ctx = urlopen(req, timeout=timeout_s)
        with resp_ctx as resp:
            charset = resp.headers.get_content_charset() or "utf-8"
            text = resp.read().decode(charset, errors="replace")
            try:
                return int(getattr(resp, "status", 200) or 200), json.loads(text)
            except Exception:
                return int(getattr(resp, "status", 200) or 200), {"error": "bad_json", "raw": text}
    except Exception as e:
        code = getattr(e, "code", None)
        if code is None:
            raise
        try:
            raw = e.read()
        except Exception:
            raw = b""
        text = raw.decode("utf-8", errors="replace") if isinstance(raw, (bytes, bytearray)) else str(raw)
        try:
            return int(code), json.loads(text)
        except Exception:
            return int(code), {"error": "http_error", "status": int(code), "raw": text}


def fetch_bytes(url: str, timeout_s: int, no_proxy: bool) -> tuple[int, bytes, str | None]:
    headers = {
        "User-Agent": "Mozilla/5.0",
        "Accept": "*/*",
        "Accept-Language": "en-US,en;q=0.8,id;q=0.7",
        "Connection": "close",
    }
    req = Request(url, headers=headers, method="GET")
    try:
        if no_proxy:
            opener = build_opener(ProxyHandler({}))
            resp_ctx = opener.open(req, timeout=timeout_s)
        else:
            resp_ctx = urlopen(req, timeout=timeout_s)
        with resp_ctx as resp:
            status = int(getattr(resp, "status", 200) or 200)
            raw = resp.read()
            ct = None
            try:
                ct = resp.headers.get("Content-Type")
            except Exception:
                ct = None
            return status, raw, ct
    except Exception as e:
        code = getattr(e, "code", None)
        if code is None:
            raise
        try:
            raw = e.read()
        except Exception:
            raw = b""
        ct = None
        try:
            ct = getattr(e, "headers", None).get("Content-Type") if getattr(e, "headers", None) is not None else None
        except Exception:
            ct = None
        return int(code), raw if isinstance(raw, (bytes, bytearray)) else b"", ct


def _parse_countdown(html: str, fetched_at: int) -> dict[str, Any] | None:
    m = re.search(
        r'countDownDate\s*=\s*new\s+Date\("([^"]+)"\)\.getTime\(\)\s*;',
        html,
    )
    if not m:
        return None
    iso = m.group(1).strip()
    try:
        dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
    except Exception:
        return None

    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    end_at = int(dt.timestamp())
    remaining_s = int(end_at - fetched_at)
    ended = remaining_s <= 0
    remaining_s = max(0, remaining_s)

    days = remaining_s // 86400
    hours = (remaining_s % 86400) // 3600
    minutes = (remaining_s % 3600) // 60
    seconds = remaining_s % 60

    return {
        "end_at": end_at,
        "remaining_s": remaining_s,
        "ended": ended,
        "remaining": {
            "days": int(days),
            "hours": int(hours),
            "minutes": int(minutes),
            "seconds": int(seconds),
        },
    }


def _parse_squad_scores(html: str) -> list[dict[str, Any]]:
    pat = re.compile(
        r">\s*(HQ|Assault|Medic|Kage|Ambush)\s*</span>\s*<span>\s*([0-9.,]+)\s*</span>"
        r"(?:\s*<span[^>]*>\s*\(Gap\s*([0-9.,]+)\)\s*</span>)?",
        flags=re.IGNORECASE,
    )
    out: list[dict[str, Any]] = []
    for squad, score, gap in pat.findall(html):
        item: dict[str, Any] = {
            "name": squad,
            "score": _to_int(score),
        }
        gap_i = _to_int(gap) if gap else None
        if gap_i is not None:
            item["gap"] = gap_i
        out.append(item)
    return out


def _scrape_from_html(url: str, html: str) -> dict[str, Any]:
    fetched_at = int(time.time())
    parser = _HTMLTables()
    parser.feed(html)
    best = _pick_best_table(parser.tables)

    headers: list[str] | None = None
    body: list[list[str]] = []
    if best:
        headers, body = _infer_header_and_body(best.rows)

    col_map = _header_map(headers) if headers else {}
    mapping = col_map if col_map else None

    out_rows: list[dict[str, Any]] = []
    for r in body:
        item = _row_to_item(r, mapping)
        if item:
            out_rows.append(item)

    countdown = _parse_countdown(html, fetched_at=fetched_at)
    squads = _parse_squad_scores(html)

    return {
        "url": url,
        "fetched_at": fetched_at,
        "season_end_at": countdown["end_at"] if countdown else None,
        "season_remaining": countdown["remaining"] if countdown else None,
        "season_remaining_s": countdown["remaining_s"] if countdown else None,
        "season_ended": countdown["ended"] if countdown else None,
        "squads": squads,
        "count": len(out_rows),
        "rows": out_rows,
    }


def scrape_shadow_war(url: str, cookie: str | None, timeout_s: int, no_proxy: bool) -> dict[str, Any]:
    html = fetch_html(url, cookie=cookie, timeout_s=timeout_s, no_proxy=no_proxy)
    return _scrape_from_html(url, html)


def _dump_json(obj: Any, pretty: bool) -> str:
    if pretty:
        return json.dumps(obj, ensure_ascii=False, indent=2)
    return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))


def _b64url_encode(raw: bytes) -> str:
    return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")


def _b64url_decode(text: str) -> bytes:
    s = (text or "").strip()
    if not s:
        return b""
    pad = "=" * ((4 - (len(s) % 4)) % 4)
    return base64.urlsafe_b64decode((s + pad).encode("ascii"))


def _jwt_sign_hs256(secret: bytes, msg: bytes) -> bytes:
    return hmac.new(secret, msg, hashlib.sha256).digest()


def _jwt_encode_hs256(secret: bytes, payload: dict[str, Any]) -> str:
    header = {"alg": "HS256", "typ": "JWT"}
    header_b64 = _b64url_encode(_dump_json(header, pretty=False).encode("utf-8"))
    payload_b64 = _b64url_encode(_dump_json(payload, pretty=False).encode("utf-8"))
    signing_input = f"{header_b64}.{payload_b64}".encode("ascii")
    sig_b64 = _b64url_encode(_jwt_sign_hs256(secret, signing_input))
    return f"{header_b64}.{payload_b64}.{sig_b64}"


def _jwt_verify_hs256(secret: bytes, token: str) -> dict[str, Any] | None:
    t = (token or "").strip()
    if not t or t.count(".") != 2:
        return None
    header_b64, payload_b64, sig_b64 = t.split(".", 2)
    try:
        header_raw = _b64url_decode(header_b64)
        payload_raw = _b64url_decode(payload_b64)
        sig_raw = _b64url_decode(sig_b64)
    except Exception:
        return None
    try:
        header = json.loads(header_raw.decode("utf-8", errors="strict"))
        payload = json.loads(payload_raw.decode("utf-8", errors="strict"))
    except Exception:
        return None
    if not isinstance(header, dict) or not isinstance(payload, dict):
        return None
    if (header.get("alg") or "") != "HS256":
        return None
    signing_input = f"{header_b64}.{payload_b64}".encode("ascii")
    expected = _jwt_sign_hs256(secret, signing_input)
    if not hmac.compare_digest(expected, sig_raw):
        return None
    now_i = int(time.time())
    try:
        exp_i = int(payload.get("exp") or 0)
    except Exception:
        exp_i = 0
    if exp_i and exp_i <= now_i:
        return None
    return payload


def _write_text_atomic(path: Path, text: str) -> None:
    tmp = path.with_suffix(path.suffix + ".tmp")
    tmp.write_text(text, encoding="utf-8")
    tmp.replace(path)


@dataclass
class _ServerState:
    lock: threading.Lock = field(default_factory=threading.Lock)
    last_ok: dict[str, Any] | None = None
    last_error: str | None = None
    last_updated_at: int | None = None


def _error_payload(url: str, message: str) -> dict[str, Any]:
    return {
        "url": url,
        "fetched_at": int(time.time()),
        "count": 0,
        "rows": [],
        "error": message,
    }


def _scrape_now(
    url: str,
    cookie: str | None,
    timeout_s: int,
    no_proxy: bool,
    limit: int,
) -> dict[str, Any]:
    def _socketio_rows_ok(rows: Any) -> bool:
        if not isinstance(rows, list) or not rows:
            return False
        if len(rows) < 80:
            return False

        ranks: list[int] = []
        for r in rows:
            if not isinstance(r, dict):
                continue
            v = r.get("rank")
            try:
                n = int(v)
            except Exception:
                continue
            if n > 0:
                ranks.append(n)

        if not ranks:
            return False
        if min(ranks) != 1:
            return False
        if len(set(ranks)) < max(10, int(len(ranks) * 0.6)):
            return False
        if max(ranks) < min(100, len(rows)):
            return False
        return True

    socketio_error: str | None = None
    if limit and limit > 100:
        try:
            sock_payload = scrape_shadow_war_socketio(
                url=url,
                ws_url="wss://ws-ldns.ninjasage.id/socket.io/?EIO=3&transport=websocket",
                origin="https://ninjasage.id",
                timeout_s=max(5, int(timeout_s)),
                limit=int(limit),
                channel="sw.leaderboard",
                event_name="UpdateSWTrophy",
            )
            if _socketio_rows_ok(sock_payload.get("rows")):
                sock_payload = _enrich_payload_rates(sock_payload)
                _maybe_persist_payload(sock_payload)
                return sock_payload
            socketio_error = "socketio_payload_not_leaderboard"
        except Exception as e:
            socketio_error = str(e) or "socketio_error"

    result = scrape_shadow_war(url, cookie=cookie, timeout_s=timeout_s, no_proxy=no_proxy)
    result = _enrich_payload_rates(result)
    if limit and limit > 0:
        result["rows"] = result["rows"][:limit]
        result["count"] = len(result["rows"])
    if socketio_error and not result.get("error"):
        result["socketio_error"] = socketio_error
    _maybe_persist_payload(result)
    return result


class _APIHandler(BaseHTTPRequestHandler):
    server: "_APIServer"
    protocol_version = "HTTP/1.1"

    def _send_json(self, status: int, payload: Any) -> None:
        body = _dump_json(payload, pretty=True).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Cache-Control", "no-store")
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, X-NinjaSage-Cookie")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _send_css(self, status: int, css: str) -> None:
        body = (css or "").encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "text/css; charset=utf-8")
        self.send_header("Cache-Control", "no-store")
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, X-NinjaSage-Cookie")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _send_html(self, status: int, html: str) -> None:
        body = html.encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "text/html; charset=utf-8")
        self.send_header("Cache-Control", "no-store")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _send_bytes(self, status: int, body: bytes, content_type: str | None) -> None:
        raw = body if isinstance(body, (bytes, bytearray)) else b""
        self.send_response(status)
        self.send_header("Content-Type", content_type or "application/octet-stream")
        self.send_header("Cache-Control", "public, max-age=86400")
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, X-NinjaSage-Cookie")
        self.send_header("Content-Length", str(len(raw)))
        self.end_headers()
        if (getattr(self, "command", "") or "").upper() != "HEAD":
            self.wfile.write(raw)

    def _send_sse_headers(self) -> None:
        self.send_response(200)
        self.send_header("Content-Type", "text/event-stream; charset=utf-8")
        self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
        self.send_header("Connection", "close")
        self.send_header("X-Accel-Buffering", "no")
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, X-NinjaSage-Cookie")
        self.end_headers()

    def _write_sse_event(self, event: str, data: str) -> None:
        payload = f"event: {event}\ndata: {data}\n\n"
        self.wfile.write(payload.encode("utf-8"))
        self.wfile.flush()

    def _write_sse_comment(self, text: str) -> None:
        payload = f": {text}\n\n"
        self.wfile.write(payload.encode("utf-8"))
        self.wfile.flush()

    def _read_body_bytes(self) -> bytes:
        try:
            n = int(self.headers.get("Content-Length") or "0")
        except Exception:
            n = 0
        if n <= 0:
            return b""
        return self.rfile.read(n)

    def _read_body_fields(self) -> dict[str, str]:
        raw = self._read_body_bytes()
        if not raw:
            return {}
        content_type = (self.headers.get("Content-Type") or "").split(";")[0].strip().lower()
        text = raw.decode("utf-8", errors="replace")
        if content_type == "application/json":
            try:
                obj = json.loads(text)
            except Exception:
                return {}
            if not isinstance(obj, dict):
                return {}
            out: dict[str, str] = {}
            for k, v in obj.items():
                if isinstance(v, bool):
                    out[str(k)] = "1" if v else "0"
                elif v is None:
                    out[str(k)] = ""
                else:
                    out[str(k)] = str(v)
            return out
        if content_type == "application/x-www-form-urlencoded":
            q = parse_qs(text, keep_blank_values=True)
            return {k: (v[0] if v else "") for k, v in q.items()}
        return {}

    def _parse_cookie_header(self) -> dict[str, str]:
        raw = self.headers.get("Cookie") or ""
        out: dict[str, str] = {}
        for part in raw.split(";"):
            p = part.strip()
            if not p or "=" not in p:
                continue
            k, v = p.split("=", 1)
            key = k.strip()
            val = v.strip()
            if key:
                out[key] = val
        return out

    def _admin_auth_required(self) -> bool:
        return bool(getattr(self.server, "admin_password", None))

    def _admin_is_authed(self) -> bool:
        if not self._admin_auth_required():
            return True
        jwt_cookie_name = getattr(self.server, "admin_jwt_cookie_name", "sw_admin_jwt")
        jwt_token = self._parse_cookie_header().get(jwt_cookie_name)
        if jwt_token:
            secret = getattr(self.server, "admin_jwt_secret", None)
            if isinstance(secret, (bytes, bytearray)) and secret:
                payload = _jwt_verify_hs256(bytes(secret), jwt_token)
                if payload is not None and (payload.get("sub") or "") == "admin":
                    return True
        cookie_name = getattr(self.server, "admin_cookie_name", "sw_admin_session")
        token = self._parse_cookie_header().get(cookie_name)
        if not token:
            return False
        now_i = int(time.time())
        with self.server.admin_lock:
            exp = self.server.admin_sessions.get(token)
            if exp is None:
                return False
            if exp <= now_i:
                self.server.admin_sessions.pop(token, None)
                return False
        return True

    def _admin_require(self) -> bool:
        if self._admin_is_authed():
            return True
        self._send_json(401, {"error": "unauthorized"})
        return False

    def _admin_html(self) -> str:
        auth_required = bool(getattr(self.server, "admin_password", None))
        html = """<!doctype html>
<html lang="en">
  <head>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <title>NinjaSage Realtime Admin</title>
    <script>window.__ADMIN_AUTH_REQUIRED__ = __AUTH_REQUIRED__;</script>
    <script src="https://unpkg.com/react@18/umd/react.production.min.js" crossorigin></script>
    <script src="https://unpkg.com/react-dom@18/umd/react-dom.production.min.js" crossorigin></script>
    <script src="https://cdn.tailwindcss.com"></script>
    <style>
      :root {
        --bg0: #070b14;
        --bg1: #0b1020;
        --panel: rgba(255,255,255,0.06);
        --stroke: rgba(136,255,255,0.22);
        --fg: rgba(232,255,255,0.92);
        --muted: rgba(232,255,255,0.65);
        --cyan: #2efcff;
        --magenta: #ff4fd8;
        --green: #63ffa7;
        --red: #ff5b5b;
      }
      html, body { height: 100%; }
      body {
        margin: 0;
        color: var(--fg);
        font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial;
        background: radial-gradient(1200px 800px at 20% 10%, rgba(46,252,255,0.16), transparent 60%),
                    radial-gradient(900px 600px at 80% 30%, rgba(255,79,216,0.10), transparent 55%),
                    linear-gradient(180deg, var(--bg0), var(--bg1));
      }
      .wrap { max-width: 1180px; margin: 0 auto; padding: 20px; }
      .title { display: flex; align-items: baseline; justify-content: space-between; gap: 12px; }
      h1 { font-size: 20px; margin: 0; letter-spacing: 0.3px; }
      .sub { font-size: 12px; color: var(--muted); }
      .grid { display: grid; grid-template-columns: 380px 1fr; gap: 16px; margin-top: 14px; }
      .card {
        background: var(--panel);
        border: 1px solid var(--stroke);
        border-radius: 14px;
        box-shadow: 0 0 24px rgba(46,252,255,0.08), 0 0 40px rgba(255,79,216,0.06);
        padding: 14px;
        backdrop-filter: blur(10px);
      }
      .row { display: grid; grid-template-columns: 1fr; gap: 10px; }
      label { font-size: 12px; color: var(--muted); display: block; margin-bottom: 6px; }
      input[type="text"], input[type="number"], input[type="password"] {
        width: 100%;
        box-sizing: border-box;
        background: rgba(0,0,0,0.35);
        color: var(--fg);
        border: 1px solid rgba(46,252,255,0.25);
        border-radius: 10px;
        padding: 10px 10px;
        outline: none;
      }
      input[type="checkbox"] { transform: translateY(1px); }
      .btns { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 10px; }
      button {
        appearance: none;
        border: 1px solid rgba(46,252,255,0.35);
        background: rgba(0,0,0,0.25);
        color: var(--fg);
        padding: 9px 12px;
        border-radius: 10px;
        cursor: pointer;
      }
      button.primary { border-color: rgba(255,79,216,0.40); box-shadow: 0 0 16px rgba(255,79,216,0.12); }
      button.good { border-color: rgba(99,255,167,0.40); box-shadow: 0 0 16px rgba(99,255,167,0.12); }
      button.bad { border-color: rgba(255,91,91,0.45); box-shadow: 0 0 16px rgba(255,91,91,0.10); }
      button:disabled { opacity: 0.55; cursor: not-allowed; }
      .kv { display: grid; grid-template-columns: 140px 1fr; gap: 8px; font-size: 12px; margin-top: 8px; }
      .k { color: var(--muted); }
      .v { color: var(--fg); word-break: break-word; }
      .pill { display: inline-block; padding: 2px 8px; border-radius: 999px; border: 1px solid rgba(46,252,255,0.35); color: var(--muted); font-size: 12px; }
      .pill.ok { border-color: rgba(99,255,167,0.45); color: rgba(99,255,167,0.95); }
      .pill.err { border-color: rgba(255,91,91,0.55); color: rgba(255,91,91,0.95); }
      table { width: 100%; border-collapse: collapse; font-size: 12px; min-width: 1120px; }
      th, td { padding: 8px 8px; border-bottom: 1px solid rgba(46,252,255,0.12); text-align: left; }
      th { color: var(--muted); font-weight: 600; }
      .mono { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; }
      .foot { margin-top: 10px; color: var(--muted); font-size: 12px; }
      select {
        background: rgba(0,0,0,0.35);
        color: var(--fg);
        border: 1px solid rgba(46,252,255,0.25);
        border-radius: 10px;
        padding: 7px 10px;
        outline: none;
      }
      .pager { display: flex; align-items: center; justify-content: space-between; gap: 10px; flex-wrap: wrap; margin-top: 10px; }
      .pagerBtns { display: flex; align-items: center; gap: 8px; flex-wrap: wrap; }
      @keyframes glowing {
        0% { background-position: 0 0; }
        100% { background-position: 400% 0; }
      }
      .squadNeon { color: var(--green); font-weight: 800; text-shadow: 0 0 12px rgba(99,255,167,0.75), 0 0 28px rgba(46,252,255,0.28); }
      .trophyUpdated { color: #ffd166; font-weight: 800; text-shadow: 0 0 12px rgba(255,209,102,0.55); }
      @keyframes loginIn { from { transform: translateY(10px) scale(0.985); opacity: 0; } to { transform: translateY(0) scale(1); opacity: 1; } }
      @keyframes shakeX { 0%, 100% { transform: translateX(0); } 20% { transform: translateX(-6px); } 40% { transform: translateX(6px); } 60% { transform: translateX(-4px); } 80% { transform: translateX(4px); } }
      .loginOverlay { position: fixed; inset: 0; display: flex; align-items: center; justify-content: center; padding: 20px; background: rgba(7,11,20,0.72); backdrop-filter: blur(8px); z-index: 50; }
      .loginCard { width: min(520px, 100%); background: rgba(255,255,255,0.06); border: 1px solid rgba(136,255,255,0.22); border-radius: 14px; box-shadow: 0 0 24px rgba(46,252,255,0.10), 0 0 40px rgba(255,79,216,0.08); padding: 14px; animation: loginIn 220ms ease-out; }
      .loginCard.shake { animation: shakeX 300ms ease-in-out; }
      .loginTitle { font-size: 16px; font-weight: 700; margin: 0 0 4px 0; }
      .loginHint { color: var(--muted); font-size: 12px; margin: 0 0 12px 0; }
      .loginErr { margin-top: 10px; color: rgba(255,91,91,0.95); font-size: 12px; }
      .loginGrid { display: grid; grid-template-columns: 1fr; gap: 10px; }
      .loginHelp { margin-top: 6px; font-size: 12px; color: var(--muted); }
      .loginHelp.bad { color: rgba(255,91,91,0.95); }
      .inputWrap { position: relative; }
      .iconBtn {
        position: absolute;
        top: 50%;
        transform: translateY(-50%);
        right: 8px;
        border: 1px solid rgba(46,252,255,0.20);
        background: rgba(0,0,0,0.20);
        padding: 6px 8px;
        border-radius: 10px;
        font-size: 12px;
      }
      .linkBtn {
        border: none;
        background: transparent;
        color: rgba(46,252,255,0.92);
        padding: 0;
        cursor: pointer;
        text-decoration: underline;
        text-underline-offset: 3px;
      }
      .divider { height: 1px; background: rgba(46,252,255,0.14); margin: 10px 0; }
      .socialRow { display: grid; grid-template-columns: 1fr 1fr; gap: 10px; }
      .socialBtn { border-color: rgba(136,255,255,0.22); background: rgba(0,0,0,0.22); }
      .socialBtn:disabled { opacity: 0.45; }
      .modalOverlay { position: fixed; inset: 0; display: flex; align-items: center; justify-content: center; padding: 20px; background: rgba(7,11,20,0.72); backdrop-filter: blur(8px); z-index: 60; }
      .modalCard { width: min(520px, 100%); background: rgba(255,255,255,0.06); border: 1px solid rgba(136,255,255,0.22); border-radius: 14px; box-shadow: 0 0 24px rgba(46,252,255,0.10), 0 0 40px rgba(255,79,216,0.08); padding: 14px; animation: loginIn 220ms ease-out; }
    </style>
    <style id="idColorCss"></style>
  </head>
  <body>
    <div id="loginRoot"></div>
    <div class="wrap" id="adminRoot">
      <div class="title">
        <h1>Realtime Admin</h1>
        <div style="display:flex; gap:10px; align-items:center;">
          <div class="sub mono" id="serverBase"></div>
          <button class="bad" id="btnLogout" style="display:none;">Logout</button>
        </div>
      </div>
      <div class="grid">
        <div class="card">
          <div class="row">
            <div>
              <label>Shadow War URL</label>
              <input id="cfgUrl" type="text" spellcheck="false" />
            </div>
            <div style="display:grid; grid-template-columns: 1fr 1fr; gap: 10px;">
              <div>
                <label>Timeout (s)</label>
                <input id="cfgTimeout" type="number" min="1" step="1" />
              </div>
              <div>
                <label>Watch Interval (s)</label>
                <input id="cfgWatch" type="number" min="0" step="1" />
              </div>
            </div>
            <div style="display:flex; align-items:center; gap:10px;">
              <label style="margin:0;">
                <input id="cfgNoProxy" type="checkbox" />
                No Proxy
              </label>
              <span class="pill mono" id="cookieBadge">cookie: unknown</span>
            </div>
            <div>
              <label>Cookie (optional, not displayed back)</label>
              <input id="cfgCookie" type="password" placeholder="Paste Cookie header value…" />
              <div style="margin-top:6px; display:flex; gap:10px; align-items:center;">
                <label style="margin:0;">
                  <input id="cfgClearCookie" type="checkbox" />
                  Clear stored cookie
                </label>
              </div>
            </div>
            <div style="display:grid; grid-template-columns: 1fr 1fr; gap: 10px;">
              <div>
                <label>Leaderboard Popup Side</label>
                <select id="cfgPopupSide">
                  <option value="left">Left</option>
                  <option value="right">Right</option>
                </select>
              </div>
              <div>
                <label>Leaderboard Popup Auto Open</label>
                <label style="margin:0; display:flex; align-items:center; gap:10px;">
                  <input id="cfgPopupAutoOpen" type="checkbox" />
                  Enabled
                </label>
              </div>
            </div>
          </div>
          <div class="btns">
            <button class="primary" id="btnSave">Save Config</button>
            <button class="good" id="btnScrape">Scrape Now</button>
            <button class="bad" id="btnStopWatch">Stop Watcher</button>
            <button id="btnRefresh">Refresh View</button>
          </div>
          <div class="foot mono" id="statusLine"></div>
          <div class="kv" style="margin-top: 12px;">
            <div class="k">Squads</div>
            <div class="v" style="display:flex; flex-wrap:wrap; gap:6px;">
              <span class="pill mono">0: Assault</span>
              <span class="pill mono">1: Ambush</span>
              <span class="pill mono">2: Medic</span>
              <span class="pill mono">3: Kage</span>
              <span class="pill mono">4: HQ</span>
            </div>
            <div class="k">Leagues</div>
            <div class="v" style="display:flex; flex-wrap:wrap; gap:6px;">
              <span class="pill mono">2: Bronze</span>
              <span class="pill mono">3: Silver</span>
              <span class="pill mono">4: Gold</span>
              <span class="pill mono">5: Master</span>
              <span class="pill mono">6: Grand Master</span>
              <span class="pill mono">7: Sage</span>
            </div>
          </div>
        </div>

        <div class="card">
          <div style="display:flex; align-items:center; justify-content:space-between; gap: 10px;">
            <div>
              <span class="pill mono" id="stateBadge">state: unknown</span>
            </div>
            <div class="sub mono" id="updatedAt"></div>
          </div>
          <div class="kv">
            <div class="k">Last Error</div>
            <div class="v mono" id="lastError">-</div>
            <div class="k">Rows</div>
            <div class="v mono" id="rowCount">-</div>
            <div class="k">Season</div>
            <div class="v mono" id="seasonInfo">-</div>
          </div>
          <div style="margin-top: 12px; overflow:auto;">
            <table>
              <thead>
                <tr>
                  <th style="width:56px;">#</th>
                  <th>Name</th>
                  <th style="width:72px;">Level</th>
                  <th style="width:120px;">Squad</th>
                  <th style="width:120px;">League</th>
                  <th style="width:110px;">Trophy</th>
                  <th style="width:110px;">Gap</th>
                  <th style="width:96px;">T/hr</th>
                  <th style="width:96px;">AVG/hr</th>
                  <th style="width:110px;">AVG/DAY</th>
                  <th style="width:120px;">OVERTAKE</th>
                </tr>
              </thead>
              <tbody id="rowsTbody"></tbody>
            </table>
            <div class="pager">
              <div class="sub mono" id="pagerInfo">-</div>
              <div class="pagerBtns">
                <button id="btnFirst">First</button>
                <button id="btnPrev">Prev</button>
                <span id="pagerNums" style="display:flex; gap:6px; align-items:center;"></span>
                <span class="pill mono" id="pageBadge">page -/-</span>
                <button id="btnNext">Next</button>
                <button id="btnLast">Last</button>
                <span class="sub mono">size</span>
                <select id="pageSize">
                  <option value="25">25</option>
                  <option value="50" selected>50</option>
                  <option value="100">100</option>
                  <option value="250">250</option>
                </select>
              </div>
            </div>
          </div>
        </div>
      </div>
    </div>

    <script>
      const $ = (id) => document.getElementById(id);
      const setText = (id, text) => { $(id).textContent = text; };
      const setHtml = (id, html) => { $(id).innerHTML = html; };

      const apiGet = async (path) => {
        const r = await fetch(path, { headers: { "Accept": "application/json" } });
        const t = await r.text();
        let j;
        try { j = JSON.parse(t); } catch { j = { error: "bad_json", raw: t }; }
        if (!r.ok) throw j;
        return j;
      };

      const apiPost = async (path, body) => {
        const r = await fetch(path, {
          method: "POST",
          headers: { "Content-Type": "application/json", "Accept": "application/json" },
          body: JSON.stringify(body || {}),
        });
        const t = await r.text();
        let j;
        try { j = JSON.parse(t); } catch { j = { error: "bad_json", raw: t }; }
        if (!r.ok) throw j;
        return j;
      };

      const toInt = (v, fallback) => {
        const n = parseInt(String(v || ""), 10);
        return Number.isFinite(n) ? n : fallback;
      };

      const fmtRate = (v) => {
        const n = (typeof v === "number") ? v : (v == null ? null : Number(v));
        if (!Number.isFinite(n)) return "-";
        try { return n.toLocaleString(undefined, { maximumFractionDigits: 1 }); } catch { return String(n); }
      };

      const fmtOvertake = (v) => {
        const h = (typeof v === "number") ? v : (v == null ? null : Number(v));
        if (!Number.isFinite(h) || h < 0) return "-";
        const totalMin = Math.max(0, Math.round(h * 60));
        const d = Math.floor(totalMin / 1440);
        const hh = Math.floor((totalMin % 1440) / 60);
        const mm = totalMin % 60;
        const parts = [];
        if (d) parts.push(d + "d");
        if (hh || d) parts.push(hh + "h");
        parts.push(mm + "m");
        return parts.join(" ");
      };

      const safeHex = (v) => {
        const s = String(v || "").trim();
        if (!s) return "";
        if (/^#[0-9a-fA-F]{3}$/.test(s)) return s;
        if (/^#[0-9a-fA-F]{6}$/.test(s)) return s;
        return "";
      };

      const escapeHtml = (v) => {
        const s = String(v == null ? "" : v);
        return s.replace(/[&<>"']/g, (ch) => {
          if (ch === "&") return "&amp;";
          if (ch === "<") return "&lt;";
          if (ch === ">") return "&gt;";
          if (ch === "\"") return "&quot;";
          return "&#39;";
        });
      };

      const loadIdColorCss = async () => {
        const el = document.getElementById("idColorCss");
        if (!el) return;
        try {
          const resp = await fetch("/shadow-war/realtime/id-color.css", { cache: "no-store" });
          if (!resp.ok) return;
          el.textContent = await resp.text();
        } catch {}
      };

      let currentPayload = null;
      let page = 1;
      let pageSize = 50;
      let prevTrophyByCid = new Map();
      const updatedCidSet = new Set();
      const updatedTimers = new Map();

      const flashCids = (cids) => {
        const list = Array.isArray(cids) ? cids : [];
        if (!list.length) return;
        let any = false;
        for (const c of list) {
          const cid = String(c || "").trim();
          if (!cid) continue;
          any = true;
          updatedCidSet.add(cid);
          const prev = updatedTimers.get(cid);
          if (prev) window.clearTimeout(prev);
          const t = window.setTimeout(() => {
            updatedCidSet.delete(cid);
            updatedTimers.delete(cid);
            renderRows(currentPayload, { flash: false });
          }, 2000);
          updatedTimers.set(cid, t);
        }
        if (any) renderRows(currentPayload, { flash: false });
      };

      const clamp = (v, lo, hi) => Math.min(Math.max(v, lo), hi);

      const totalPagesFor = (len) => Math.max(1, Math.ceil(len / Math.max(1, pageSize)));

      const pageItemsFor = (totalPages) => {
        if (totalPages <= 9) {
          const all = [];
          for (let i = 1; i <= totalPages; i++) all.push(i);
          return all;
        }
        const items = [];
        const left = Math.max(2, page - 2);
        const right = Math.min(totalPages - 1, page + 2);
        items.push(1);
        if (left > 2) items.push(null);
        for (let i = left; i <= right; i++) items.push(i);
        if (right < totalPages - 1) items.push(null);
        items.push(totalPages);
        return items;
      };

      const renderPagerNums = (totalPages) => {
        const el = $("pagerNums");
        if (!el) return;
        if (totalPages <= 1) {
          el.innerHTML = "";
          return;
        }
        const items = pageItemsFor(totalPages);
        const html = items.map((p) => {
          if (p == null) return "<span class='pill mono' style='opacity:0.7;'>…</span>";
          const active = p === page;
          const cls = active ? "primary" : "";
          const dis = active ? "disabled" : "";
          return `<button class='${cls}' data-page='${p}' ${dis}>${p}</button>`;
        }).join("");
        el.innerHTML = html;
      };

      const setPagerUi = (len) => {
        const totalPages = totalPagesFor(len);
        page = clamp(page, 1, totalPages);
        const start = len ? (page - 1) * pageSize + 1 : 0;
        const end = len ? Math.min(len, (page - 1) * pageSize + pageSize) : 0;
        setText("pagerInfo", len ? `Showing ${start}-${end} of ${len}` : "No data");
        setText("pageBadge", `page ${page}/${totalPages}`);
        $("btnFirst").disabled = page <= 1;
        $("btnPrev").disabled = page <= 1;
        $("btnNext").disabled = page >= totalPages;
        $("btnLast").disabled = page >= totalPages;
        renderPagerNums(totalPages);
      };

      const renderRows = (payload, opts) => {
        const flash = !!(opts && opts.flash);
        const rows = (payload && Array.isArray(payload.rows)) ? payload.rows : [];
        if (!rows.length) {
          setHtml("rowsTbody", "<tr><td colspan='11' class='mono' style='color: rgba(232,255,255,0.65); padding: 14px 8px;'>No data</td></tr>");
          setPagerUi(0);
          return;
        }

        const nextMap = new Map();
        const changed = [];
        for (const r of rows) {
          if (!r) continue;
          const cid = r.cid != null ? String(r.cid) : "";
          if (!cid) continue;
          const trophyRaw = (r.trophy != null || r.point != null) ? (r.trophy != null ? r.trophy : r.point) : null;
          const trophyNum = (typeof trophyRaw === "number") ? trophyRaw : (trophyRaw == null ? null : Number(trophyRaw));
          if (!Number.isFinite(trophyNum)) continue;
          nextMap.set(cid, trophyNum);
          if (flash) {
            const prev = prevTrophyByCid.get(cid);
            if (prev != null && prev !== trophyNum) changed.push(cid);
          }
        }
        prevTrophyByCid = nextMap;
        if (flash && changed.length) flashCids(changed);

        const totalPages = totalPagesFor(rows.length);
        page = clamp(page, 1, totalPages);
        const startIdx = (page - 1) * pageSize;
        const endIdx = Math.min(rows.length, startIdx + pageSize);
        const slice = rows.slice(startIdx, endIdx);
        const html = slice.map((r) => {
          const cid = (r && r.cid != null) ? String(r.cid) : "";
          const isUpdated = cid && updatedCidSet.has(cid);
          const rank = (r && r.rank != null) ? r.rank : "-";
          const name = (r && (r.name || r.player_name)) ? (r.name || r.player_name) : "-";
          const nameColor = r ? safeHex(r.id_color_name || r.nameColor || r.idColorName) : "";
          const level = (r && r.level != null) ? r.level : "-";
          const squad = (r && (r.squad_name || r.squad)) ? (r.squad_name || r.squad) : "-";
          const league = (r && (r.league_name || r.league)) ? (r.league_name || r.league) : "-";
          const trophy = (r && (r.trophy != null || r.point != null)) ? (r.trophy != null ? r.trophy : r.point) : "-";
          const gap = (r && r.gap != null) ? r.gap : "-";
          const tPerHour = (r && (r.tPerHour != null || r.t_per_hr != null)) ? (r.tPerHour != null ? r.tPerHour : r.t_per_hr) : null;
          const avgPerHour = (r && (r.avgPerHour != null || r.avg_per_hr != null)) ? (r.avgPerHour != null ? r.avgPerHour : r.avg_per_hr) : null;
          const avgPerDay = (r && (r.avgPerDay != null || r.avg_per_day != null)) ? (r.avgPerDay != null ? r.avgPerDay : r.avg_per_day) : null;
          const overtake = (r && (r.overtake != null || r.overtake_hr != null)) ? (r.overtake != null ? r.overtake : r.overtake_hr) : null;
          const squadHtml = isUpdated ? ("<span class='squadNeon'>" + String(squad) + "</span>") : String(squad);
          const trophyCls = isUpdated ? "mono trophyUpdated" : "mono";
          const nameHtml = cid ? ("<span class='id-" + cid + "' style='font-weight:800;'>" + escapeHtml(name) + "</span>") : escapeHtml(name);
          return "<tr>"
            + "<td class='mono'>" + String(rank) + "</td>"
            + "<td>" + nameHtml + "</td>"
            + "<td class='mono'>" + String(level) + "</td>"
            + "<td>" + squadHtml + "</td>"
            + "<td class='mono'>" + String(league) + "</td>"
            + "<td class='" + trophyCls + "'>" + String(trophy) + "</td>"
            + "<td class='mono'>" + String(gap) + "</td>"
            + "<td class='mono' style='text-align:right;'>" + String(fmtRate(tPerHour)) + "</td>"
            + "<td class='mono' style='text-align:right;'>" + String(fmtRate(avgPerHour)) + "</td>"
            + "<td class='mono' style='text-align:right;'>" + String(fmtRate(avgPerDay)) + "</td>"
            + "<td class='mono' style='text-align:right;'>" + String(fmtOvertake(overtake)) + "</td>"
            + "</tr>";
        }).join("");
        setHtml("rowsTbody", html);
        setPagerUi(rows.length);
      };

      const refreshAll = async (opts) => {
        const q = new URLSearchParams();
        if (opts && opts.refresh) q.set("refresh", "1");
        q.set("limit", "250");
        const state = await apiGet("/admin/api/state?" + q.toString());
        const cfg = state && state.cfg ? state.cfg : {};
        $("cfgUrl").value = cfg.url || "";
        $("cfgTimeout").value = cfg.timeout_s != null ? String(cfg.timeout_s) : "";
        $("cfgWatch").value = cfg.watch_s != null ? String(cfg.watch_s) : "";
        $("cfgNoProxy").checked = !!cfg.no_proxy;
        $("cfgClearCookie").checked = false;
        $("cfgCookie").value = "";
        $("cookieBadge").textContent = cfg.cookie_set ? "cookie: set" : "cookie: empty";
        $("cfgPopupSide").value = cfg.ui_popup_side === "right" ? "right" : "left";
        $("cfgPopupAutoOpen").checked = cfg.ui_popup_auto_open !== false;

        const st = state && state.state ? state.state : {};
        const ok = !!(st && st.has_last_ok);
        const err = !!(st && st.last_error);
        const badge = $("stateBadge");
        badge.className = "pill mono " + (err ? "err" : (ok ? "ok" : ""));
        badge.textContent = err ? "state: error" : (ok ? "state: ok" : "state: empty");
        setText("updatedAt", st.last_updated_iso || "-");
        setText("lastError", st.last_error || "-");
        setText("rowCount", st.last_ok_count != null ? String(st.last_ok_count) : "-");

        const payload = state && state.payload ? state.payload : null;
        if (payload && payload.season_remaining) {
          setText("seasonInfo", payload.season_remaining);
        } else {
          setText("seasonInfo", "-");
        }
        const hasPrev = currentPayload != null;
        currentPayload = payload;
        renderRows(payload, { flash: hasPrev });
      };

      const saveConfig = async () => {
        const body = {
          url: $("cfgUrl").value || "",
          timeout_s: toInt($("cfgTimeout").value, 30),
          watch_s: Math.max(0, toInt($("cfgWatch").value, 0)),
          no_proxy: $("cfgNoProxy").checked,
          cookie: ($("cfgCookie").value || "").trim(),
          clear_cookie: $("cfgClearCookie").checked,
          ui_popup_side: $("cfgPopupSide").value || "left",
          ui_popup_auto_open: $("cfgPopupAutoOpen").checked,
        };
        await apiPost("/admin/api/config", body);
      };

      const scrapeNow = async () => {
        await apiPost("/admin/api/scrape", { limit: 250 });
      };

      const stopWatcher = async () => {
        await apiPost("/admin/api/watch", { watch_s: 0 });
      };

      const setBusy = (busy) => {
        $("btnSave").disabled = busy;
        $("btnScrape").disabled = busy;
        $("btnStopWatch").disabled = busy;
        $("btnRefresh").disabled = busy;
      };

      const run = async () => {
        $("serverBase").textContent = location.origin;
        await loadIdColorCss();
        $("pagerNums").addEventListener("click", (ev) => {
          const btn = ev.target && ev.target.closest ? ev.target.closest("button[data-page]") : null;
          if (!btn) return;
          page = clamp(toInt(btn.dataset.page, 1), 1, 999999);
          renderRows(currentPayload);
        });
        $("btnFirst").addEventListener("click", () => {
          page = 1;
          renderRows(currentPayload);
        });
        $("btnPrev").addEventListener("click", () => {
          page = Math.max(1, page - 1);
          renderRows(currentPayload);
        });
        $("btnNext").addEventListener("click", () => {
          page = page + 1;
          renderRows(currentPayload);
        });
        $("btnLast").addEventListener("click", () => {
          const rows = (currentPayload && Array.isArray(currentPayload.rows)) ? currentPayload.rows : [];
          page = totalPagesFor(rows.length);
          renderRows(currentPayload);
        });
        $("pageSize").addEventListener("change", () => {
          pageSize = Math.max(1, toInt($("pageSize").value, 50));
          page = 1;
          renderRows(currentPayload);
        });
        $("btnSave").addEventListener("click", async () => {
          setBusy(true);
          try {
            await saveConfig();
            setText("statusLine", "saved");
            await refreshAll({ refresh: false });
          } catch (e) {
            setText("statusLine", "save_error: " + (e && e.error ? e.error : JSON.stringify(e)));
          } finally {
            setBusy(false);
          }
        });
        $("btnScrape").addEventListener("click", async () => {
          setBusy(true);
          try {
            await scrapeNow();
            setText("statusLine", "scraped");
            await refreshAll({ refresh: false });
          } catch (e) {
            setText("statusLine", "scrape_error: " + (e && e.error ? e.error : JSON.stringify(e)));
          } finally {
            setBusy(false);
          }
        });
        $("btnStopWatch").addEventListener("click", async () => {
          setBusy(true);
          try {
            await stopWatcher();
            setText("statusLine", "watcher_stopped");
            await refreshAll({ refresh: false });
          } catch (e) {
            setText("statusLine", "stop_error: " + (e && e.error ? e.error : JSON.stringify(e)));
          } finally {
            setBusy(false);
          }
        });
        $("btnRefresh").addEventListener("click", async () => {
          setBusy(true);
          try {
            await refreshAll({ refresh: false });
            setText("statusLine", "refreshed");
          } catch (e) {
            setText("statusLine", "refresh_error: " + (e && e.error ? e.error : JSON.stringify(e)));
          } finally {
            setBusy(false);
          }
        });
        setBusy(true);
        try {
          await refreshAll({ refresh: true });
          setText("statusLine", "ready");
        } catch (e) {
          setText("statusLine", "init_error: " + (e && e.error ? e.error : JSON.stringify(e)));
        } finally {
          setBusy(false);
        }
      };

      const mountLogin = () => {
        const rootEl = document.getElementById("loginRoot");
        if (!rootEl || !window.React || !window.ReactDOM) return;
        const e = window.React.createElement;
        const useState = window.React.useState;

        const AdminLogin = () => {
          const [identity, setIdentity] = useState("");
          const [password, setPassword] = useState("");
          const [showPassword, setShowPassword] = useState(false);
          const [busy, setBusy] = useState(false);
          const [err, setErr] = useState("");
          const [shake, setShake] = useState(false);
          const [touched, setTouched] = useState(() => ({ identity: false, password: false }));
          const [resetOpen, setResetOpen] = useState(false);
          const [resetBusy, setResetBusy] = useState(false);
          const [resetMsg, setResetMsg] = useState("");

          const validateIdentity = (v) => {
            const s = String(v || "").trim();
            if (!s) return { ok: false, msg: "Email/username wajib diisi." };
            if (/^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$/.test(s)) return { ok: true };
            if (/^[a-zA-Z0-9_.-]{3,32}$/.test(s)) return { ok: true };
            return { ok: false, msg: "Gunakan email valid atau username (3–32, a-z 0-9 _ . -)." };
          };

          const validatePassword = (v) => {
            const s = String(v || "");
            if (!s) return { ok: false, msg: "Password wajib diisi." };
            return { ok: true };
          };

          const idV = validateIdentity(identity);
          const pwV = validatePassword(password);
          const canSubmit = !busy && idV.ok && pwV.ok;

          const triggerShake = () => {
            setShake(true);
            window.setTimeout(() => setShake(false), 320);
          };

          const submit = async (ev) => {
            ev && ev.preventDefault && ev.preventDefault();
            setTouched({ identity: true, password: true });
            setErr("");
            setBusy(true);
            try {
              const v1 = validateIdentity(identity);
              const v2 = validatePassword(password);
              if (!v1.ok || !v2.ok) {
                triggerShake();
                return;
              }
              await apiPost("/admin/api/login", { identity, password });
              rootEl.innerHTML = "";
              const adminRoot = document.getElementById("adminRoot");
              if (adminRoot) adminRoot.style.display = "";
              const logoutBtn = document.getElementById("btnLogout");
              if (logoutBtn) logoutBtn.style.display = "";
              if (!window.__ADMIN_STARTED__) {
                window.__ADMIN_STARTED__ = true;
                run();
              }
            } catch (e) {
              const msg = e && (e.message || e.error) ? String(e.message || e.error) : "login_failed";
              setErr(msg);
              triggerShake();
            } finally {
              setBusy(false);
            }
          };

          const openReset = () => {
            setResetMsg("");
            setResetOpen(true);
          };

          const doReset = async (ev) => {
            ev && ev.preventDefault && ev.preventDefault();
            setResetMsg("");
            setResetBusy(true);
            try {
              const idNow = String(identity || "").trim();
              if (!/^[^\\s@]+@[^\\s@]+\\.[^\\s@]+$/.test(idNow)) {
                setResetMsg("Masukkan email yang valid untuk reset password.");
                return;
              }
              await apiPost("/admin/api/reset-password", { identity: idNow });
              setResetMsg("Jika email terdaftar, instruksi reset akan diproses.");
            } catch (e) {
              setResetMsg("Gagal memulai reset. Coba lagi.");
            } finally {
              setResetBusy(false);
            }
          };

          const socialLogin = async (provider) => {
            setErr("");
            setBusy(true);
            try {
              await apiPost(`/admin/api/oauth/${provider}`, {});
            } catch (e) {
              const msg = e && (e.message || e.error) ? String(e.message || e.error) : "oauth_failed";
              setErr(msg);
              triggerShake();
            } finally {
              setBusy(false);
            }
          };

          return e(
            "div",
            { className: "loginOverlay" },
            e(
              "div",
              { className: "loginCard shadow-2xl" + (shake ? " shake" : "") },
              e("div", { className: "loginTitle" }, "Admin Login"),
              e("div", { className: "loginHint mono" }, "Masukkan password admin untuk lanjut."),
              e(
                "form",
                { onSubmit: submit },
                e(
                  "div",
                  null,
                  e("label", null, "Email / Username"),
                  e("input", {
                    type: "text",
                    value: identity,
                    autoFocus: true,
                    onBlur: () => setTouched((t) => ({ ...t, identity: true })),
                    onChange: (ev) => setIdentity(ev && ev.target ? ev.target.value : ""),
                    placeholder: "admin@email.com atau admin",
                    spellCheck: false,
                  }),
                  touched.identity && !idV.ok ? e("div", { className: "loginHelp bad" }, idV.msg) : e("div", { className: "loginHelp" }, "Gunakan email valid atau username."),
                  e("div", { style: { height: "8px" } }),
                  e("label", null, "Password"),
                  e(
                    "div",
                    { className: "inputWrap" },
                    e("input", {
                      type: showPassword ? "text" : "password",
                      value: password,
                      onBlur: () => setTouched((t) => ({ ...t, password: true })),
                      onChange: (ev) => setPassword(ev && ev.target ? ev.target.value : ""),
                      placeholder: "SW_ADMIN_PASSWORD",
                    }),
                    e(
                      "button",
                      {
                        type: "button",
                        className: "iconBtn",
                        disabled: busy,
                        onClick: () => setShowPassword((v) => !v),
                      },
                      showPassword ? "Hide" : "Show"
                    )
                  ),
                  touched.password && !pwV.ok ? e("div", { className: "loginHelp bad" }, pwV.msg) : null
                ),
                e(
                  "div",
                  { className: "btns" },
                  e("button", { className: "primary", type: "submit", disabled: !canSubmit }, busy ? "Logging in..." : "Login"),
                  e(
                    "button",
                    {
                      type: "button",
                      disabled: busy,
                      onClick: () => {
                        setIdentity("");
                        setPassword("");
                        setErr("");
                        setTouched({ identity: false, password: false });
                      },
                    },
                    "Reset"
                  )
                ),
                e(
                  "div",
                  { style: { marginTop: "8px", display: "flex", justifyContent: "space-between", alignItems: "center" } },
                  e(
                  "button",
                  { type: "button", className: "linkBtn text-cyan-300 hover:text-cyan-200", disabled: busy, onClick: openReset },
                  "Lupa Password?"
                ),
                  e("div", { className: "mono", style: { fontSize: "12px", color: "rgba(232,255,255,0.55)" } }, "JWT cookie")
                ),
                e("div", { className: "divider" }),
                e("div", { className: "loginHint mono", style: { margin: 0 } }, "Atau login dengan"),
                e(
                  "div",
                  { className: "socialRow", style: { marginTop: "10px" } },
                  e(
                    "button",
                    { type: "button", className: "socialBtn", disabled: busy, onClick: () => socialLogin("google") },
                    "Google"
                  ),
                  e(
                    "button",
                    { type: "button", className: "socialBtn", disabled: busy, onClick: () => socialLogin("facebook") },
                    "Facebook"
                  )
                ),
                err ? e("div", { className: "loginErr mono" }, err) : null
              ),
              resetOpen
                ? e(
                    "div",
                    { className: "modalOverlay", onClick: () => (!resetBusy ? setResetOpen(false) : null) },
                    e(
                      "div",
                      { className: "modalCard", onClick: (ev) => ev && ev.stopPropagation && ev.stopPropagation() },
                      e("div", { className: "loginTitle" }, "Reset Password"),
                      e("div", { className: "loginHint mono" }, "Masukkan email. Jika terdaftar, instruksi akan diproses."),
                      e(
                        "form",
                        { onSubmit: doReset },
                        e("label", null, "Email"),
                        e("input", {
                          type: "text",
                          value: identity,
                          onChange: (ev) => setIdentity(ev && ev.target ? ev.target.value : ""),
                          placeholder: "admin@email.com",
                          spellCheck: false,
                        }),
                        e(
                          "div",
                          { className: "btns" },
                          e("button", { className: "primary", type: "submit", disabled: resetBusy }, resetBusy ? "Processing..." : "Kirim"),
                          e(
                            "button",
                            { type: "button", disabled: resetBusy, onClick: () => setResetOpen(false) },
                            "Tutup"
                          )
                        ),
                        resetMsg ? e("div", { className: "loginHelp" }, resetMsg) : null
                      )
                    )
                  )
                : null
            )
          );
        };

        window.ReactDOM.createRoot(rootEl).render(e(AdminLogin));
      };

      const startAdmin = async () => {
        const adminRoot = document.getElementById("adminRoot");
        if (adminRoot) adminRoot.style.display = "none";

        const logoutBtn = document.getElementById("btnLogout");
        if (logoutBtn) logoutBtn.style.display = "none";
        if (logoutBtn && !window.__LOGOUT_WIRED__) {
          window.__LOGOUT_WIRED__ = true;
          logoutBtn.addEventListener("click", async () => {
            try {
              await apiPost("/admin/api/logout", {});
            } catch {}
            location.reload();
          });
        }

        if (!window.__ADMIN_AUTH_REQUIRED__) {
          if (adminRoot) adminRoot.style.display = "";
          if (logoutBtn) logoutBtn.style.display = "none";
          if (!window.__ADMIN_STARTED__) {
            window.__ADMIN_STARTED__ = true;
            run();
          }
          return;
        }

        try {
          const auth = await apiGet("/admin/api/auth");
          if (auth && auth.authed) {
            if (adminRoot) adminRoot.style.display = "";
            if (logoutBtn) logoutBtn.style.display = "";
            if (!window.__ADMIN_STARTED__) {
              window.__ADMIN_STARTED__ = true;
              run();
            }
            return;
          }
        } catch {}

        mountLogin();
      };

      startAdmin();
    </script>
  </body>
</html>"""
        return html.replace("__AUTH_REQUIRED__", "true" if auth_required else "false")

    def do_OPTIONS(self) -> None:
        self.send_response(204)
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type, X-NinjaSage-Cookie")
        self.send_header("Access-Control-Max-Age", "86400")
        self.end_headers()

    def do_POST(self) -> None:
        parsed = urlparse(self.path)

        if parsed.path == "/admin/api/login":
            fields = self._read_body_fields()
            identity = (fields.get("identity") or fields.get("email") or fields.get("username") or "").strip()
            password = (fields.get("password") or "").strip()
            required = bool(getattr(self.server, "admin_password", None))
            if not required:
                self._send_json(200, {"ok": True, "required": False})
                return
            if password != self.server.admin_password:
                self._send_json(401, {"error": "bad_password"})
                return

            now_i = int(time.time())
            jwt_ttl_s = int(getattr(self.server, "admin_jwt_ttl_s", 12 * 3600) or 12 * 3600)
            jwt_ttl_s = max(60, int(jwt_ttl_s))
            jwt_cookie_name = getattr(self.server, "admin_jwt_cookie_name", "sw_admin_jwt")
            secret = getattr(self.server, "admin_jwt_secret", b"")
            jwt_payload = {"sub": "admin", "iat": now_i, "exp": now_i + jwt_ttl_s}
            if identity:
                jwt_payload["id"] = identity
            jwt_token = _jwt_encode_hs256(bytes(secret), jwt_payload) if isinstance(secret, (bytes, bytearray)) and secret else ""

            token = secrets.token_urlsafe(24)
            exp = now_i + 7 * 86400
            with self.server.admin_lock:
                self.server.admin_sessions[token] = exp

            max_age = 7 * 86400
            cookie_name = getattr(self.server, "admin_cookie_name", "sw_admin_session")
            body = _dump_json({"ok": True, "required": True}, pretty=True).encode("utf-8")
            self.send_response(200)
            self.send_header("Content-Type", "application/json; charset=utf-8")
            self.send_header("Cache-Control", "no-store")
            if jwt_token:
                self.send_header("Set-Cookie", f"{jwt_cookie_name}={jwt_token}; Path=/; HttpOnly; SameSite=Lax; Max-Age={jwt_ttl_s}")
            self.send_header("Set-Cookie", f"{cookie_name}={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age={max_age}")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
            return

        if parsed.path == "/admin/api/logout":
            cookie_name = getattr(self.server, "admin_cookie_name", "sw_admin_session")
            jwt_cookie_name = getattr(self.server, "admin_jwt_cookie_name", "sw_admin_jwt")
            token = self._parse_cookie_header().get(cookie_name)
            if token:
                with self.server.admin_lock:
                    self.server.admin_sessions.pop(token, None)
            body = _dump_json({"ok": True}, pretty=True).encode("utf-8")
            self.send_response(200)
            self.send_header("Content-Type", "application/json; charset=utf-8")
            self.send_header("Cache-Control", "no-store")
            self.send_header("Set-Cookie", f"{jwt_cookie_name}=; Path=/; HttpOnly; SameSite=Lax; Max-Age=0")
            self.send_header("Set-Cookie", f"{cookie_name}=; Path=/; HttpOnly; SameSite=Lax; Max-Age=0")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
            return

        if parsed.path == "/admin/api/reset-password":
            fields = self._read_body_fields()
            identity = (fields.get("identity") or fields.get("email") or fields.get("username") or "").strip()
            self._send_json(200, {"ok": True, "queued": bool(identity)})
            return

        if parsed.path in {"/admin/api/oauth/google", "/admin/api/oauth/facebook"}:
            self._send_json(501, {"error": "oauth_not_configured"})
            return

        if parsed.path == "/admin/api/config":
            if not self._admin_require():
                return
            fields = self._read_body_fields()
            url_raw = (fields.get("url") or "").strip()
            with self.server.cfg_lock:
                url = url_raw or self.server.cfg_url
            if not url:
                self._send_json(400, {"error": "missing_url"})
                return
            try:
                timeout_s = int((fields.get("timeout_s") or "").strip() or self.server.cfg_timeout_s)
            except Exception:
                timeout_s = self.server.cfg_timeout_s
            try:
                watch_s = int((fields.get("watch_s") or "").strip() or self.server.cfg_watch_s)
            except Exception:
                watch_s = self.server.cfg_watch_s
            watch_s = max(0, int(watch_s))
            no_proxy = (fields.get("no_proxy") or "").strip().lower() in {"1", "true", "yes", "on"}
            clear_cookie = (fields.get("clear_cookie") or "").strip().lower() in {"1", "true", "yes", "on"}
            cookie = (fields.get("cookie") or "").strip()

            with self.server.cfg_lock:
                self.server.cfg_url = url
                self.server.cfg_timeout_s = max(1, int(timeout_s))
                self.server.cfg_no_proxy = bool(no_proxy)
                self.server.cfg_watch_s = int(watch_s)
                ui_side_raw = (fields.get("ui_popup_side") or "").strip().lower()
                if ui_side_raw in {"left", "right"}:
                    self.server.cfg_ui_popup_side = ui_side_raw
                if "ui_popup_auto_open" in fields:
                    self.server.cfg_ui_popup_auto_open = (fields.get("ui_popup_auto_open") or "").strip().lower() in {
                        "1",
                        "true",
                        "yes",
                        "on",
                    }
                if clear_cookie:
                    self.server.cfg_cookie = None
                elif cookie:
                    self.server.cfg_cookie = cookie

            self.server.ensure_watcher_running()
            self._send_json(200, {"ok": True})
            return

        if parsed.path == "/admin/api/watch":
            if not self._admin_require():
                return
            fields = self._read_body_fields()
            try:
                watch_s = int((fields.get("watch_s") or "").strip() or 0)
            except Exception:
                watch_s = 0
            watch_s = max(0, int(watch_s))
            with self.server.cfg_lock:
                self.server.cfg_watch_s = int(watch_s)
            self.server.ensure_watcher_running()
            self._send_json(200, {"ok": True, "watch_s": watch_s})
            return

        if parsed.path == "/admin/api/scrape":
            if not self._admin_require():
                return
            fields = self._read_body_fields()
            try:
                limit = int((fields.get("limit") or "").strip() or 0)
            except Exception:
                limit = 0
            limit = max(0, int(limit))
            with self.server.cfg_lock:
                url = self.server.cfg_url
                cookie = self.server.cfg_cookie
                timeout_s = self.server.cfg_timeout_s
                no_proxy = self.server.cfg_no_proxy
            try:
                payload = _scrape_now(
                    url,
                    cookie=cookie,
                    timeout_s=timeout_s,
                    no_proxy=no_proxy,
                    limit=limit,
                )
                with self.server.state.lock:
                    self.server.state.last_ok = payload
                    self.server.state.last_error = None
                    self.server.state.last_updated_at = int(time.time())
                self._send_json(200, payload)
            except Exception as e:
                msg = str(e)
                with self.server.state.lock:
                    self.server.state.last_error = msg
                    self.server.state.last_updated_at = int(time.time())
                self._send_json(502, _error_payload(url, msg))
            return

        self._send_json(404, {"error": "not_found"})

    def do_GET(self) -> None:
        parsed = urlparse(self.path)
        if parsed.path.startswith("/proxy/ns-assets/"):
            tail = parsed.path[len("/proxy/ns-assets") :]
            if not tail.startswith("/"):
                tail = "/" + tail
            if ".." in tail:
                self._send_json(400, {"error": "bad_path"})
                return

            ext = os.path.splitext(tail)[1].lower()
            if ext not in {".swf", ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}:
                self._send_json(400, {"error": "bad_ext"})
                return

            upstream = "https://ns-assets.ninjasage.id" + tail
            if parsed.query:
                upstream += "?" + parsed.query
            try:
                status, raw, ct = fetch_bytes(upstream, timeout_s=self.server.cfg_timeout_s, no_proxy=self.server.cfg_no_proxy)
                self._send_bytes(int(status), raw, ct)
            except Exception as e:
                self._send_json(502, {"error": "upstream_error", "message": str(e)})
            return
        if parsed.path == "/healthz":
            self._send_json(200, {"ok": True})
            return

        if parsed.path == "/shadow-war/predict":
            q = parse_qs(parsed.query or "")
            with self.server.cfg_lock:
                url = self.server.cfg_url
                cookie_cfg = self.server.cfg_cookie
                timeout_s = self.server.cfg_timeout_s
                no_proxy_cfg = self.server.cfg_no_proxy
                watch_s = int(self.server.cfg_watch_s or 0)

            url_q = (q.get("url", [None])[0] or "").strip()
            if url_q:
                url = url_q

            try:
                limit = int(q.get("limit", ["0"])[0] or 0)
            except Exception:
                limit = 0
            limit = max(0, int(limit))

            no_proxy = no_proxy_cfg
            if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
                no_proxy = True

            refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}

            cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or cookie_cfg
            cookie = cookie.strip() if isinstance(cookie, str) else None
            if self.headers.get("X-NinjaSage-Cookie") and cookie:
                with self.server.cfg_lock:
                    self.server.cfg_cookie = cookie

            payload: dict[str, Any] | None = None
            if watch_s > 0 and not refresh:
                with self.server.state.lock:
                    if self.server.state.last_ok is not None:
                        payload = self.server.state.last_ok
                    elif self.server.state.last_error is not None:
                        self._send_json(502, _error_payload(url, self.server.state.last_error))
                        return

            if payload is None:
                try:
                    payload = _scrape_now(
                        url,
                        cookie=cookie,
                        timeout_s=timeout_s,
                        no_proxy=no_proxy,
                        limit=limit,
                    )
                    with self.server.state.lock:
                        self.server.state.last_ok = payload
                        self.server.state.last_error = None
                        self.server.state.last_updated_at = int(time.time())
                except Exception as e:
                    msg = str(e)
                    with self.server.state.lock:
                        self.server.state.last_error = msg
                        self.server.state.last_updated_at = int(time.time())
                    self._send_json(502, _error_payload(url, msg))
                    return
            elif limit and limit > 0:
                rows = payload.get("rows", [])
                payload = {**payload, "rows": rows[:limit], "count": len(rows[:limit])}

            self._send_json(200, _predict_shadow_war(payload))
            return

        if parsed.path == "/shadow-war/realtime/id-color.css":
            css = _get_id_custom_color_css(int(time.time()))
            self._send_css(200, css)
            return

        if parsed.path.startswith("/shadow-war/realtime/profile/"):
            cid_raw = parsed.path[len("/shadow-war/realtime/profile/") :].strip("/")
            if not cid_raw or not cid_raw.isdigit():
                self._send_json(400, {"error": "bad_cid"})
                return
            cid = cid_raw

            q = parse_qs(parsed.query or "")
            no_proxy = self.server.cfg_no_proxy
            if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
                no_proxy = True

            cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or self.server.cfg_cookie
            cookie = cookie.strip() if isinstance(cookie, str) else None
            if self.headers.get("X-NinjaSage-Cookie") and cookie:
                with self.server.cfg_lock:
                    self.server.cfg_cookie = cookie

            try:
                upstream_url = f"https://ninjasage.id/api/sw/profile/{cid}"
                status, payload = fetch_json(
                    upstream_url,
                    cookie=cookie,
                    timeout_s=self.server.cfg_timeout_s,
                    no_proxy=no_proxy,
                )
                self._send_json(int(status), payload)
            except Exception as e:
                self._send_json(502, {"error": "upstream_error", "message": str(e)})
            return

        if parsed.path in {"/admin", "/admin/"}:
            self._send_html(200, self._admin_html())
            return

        if parsed.path == "/admin/api/auth":
            self._send_json(
                200,
                {
                    "required": bool(getattr(self.server, "admin_password", None)),
                    "authed": self._admin_is_authed(),
                },
            )
            return

        if parsed.path == "/admin/api/config":
            if not self._admin_require():
                return
            with self.server.cfg_lock:
                cfg = {
                    "url": self.server.cfg_url,
                    "timeout_s": self.server.cfg_timeout_s,
                    "no_proxy": self.server.cfg_no_proxy,
                    "watch_s": int(self.server.cfg_watch_s or 0),
                    "cookie_set": bool(self.server.cfg_cookie),
                    "ui_popup_side": self.server.cfg_ui_popup_side,
                    "ui_popup_auto_open": bool(self.server.cfg_ui_popup_auto_open),
                }
            self._send_json(200, cfg)
            return

        if parsed.path == "/admin/api/state":
            if not self._admin_require():
                return
            q = parse_qs(parsed.query or "")
            refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}
            try:
                limit = int(q.get("limit", ["0"])[0] or 0)
            except Exception:
                limit = 0
            limit = max(0, int(limit))

            with self.server.cfg_lock:
                url = self.server.cfg_url
                cookie = self.server.cfg_cookie
                timeout_s = self.server.cfg_timeout_s
                no_proxy = self.server.cfg_no_proxy
                cfg = {
                    "url": url,
                    "timeout_s": timeout_s,
                    "no_proxy": no_proxy,
                    "watch_s": int(self.server.cfg_watch_s or 0),
                    "cookie_set": bool(cookie),
                    "ui_popup_side": self.server.cfg_ui_popup_side,
                    "ui_popup_auto_open": bool(self.server.cfg_ui_popup_auto_open),
                }

            if refresh:
                try:
                    payload = _scrape_now(
                        url,
                        cookie=cookie,
                        timeout_s=timeout_s,
                        no_proxy=no_proxy,
                        limit=limit,
                    )
                    with self.server.state.lock:
                        self.server.state.last_ok = payload
                        self.server.state.last_error = None
                        self.server.state.last_updated_at = int(time.time())
                except Exception as e:
                    msg = str(e)
                    with self.server.state.lock:
                        self.server.state.last_error = msg
                        self.server.state.last_updated_at = int(time.time())

            with self.server.state.lock:
                updated_at = self.server.state.last_updated_at
                ok = self.server.state.last_ok
                err = self.server.state.last_error

            payload: dict[str, Any] | None = None
            if ok is not None:
                payload = ok
                if limit and limit > 0:
                    rows = payload.get("rows", [])
                    payload = {**payload, "rows": rows[:limit], "count": len(rows[:limit])}
            elif err is not None:
                payload = _error_payload(url, err)

            state = {
                "last_updated_at": updated_at,
                "last_updated_iso": datetime.fromtimestamp(updated_at, tz=timezone.utc).isoformat().replace("+00:00", "Z")
                if updated_at
                else None,
                "has_last_ok": ok is not None,
                "last_ok_count": (ok or {}).get("count") if isinstance(ok, dict) else None,
                "last_error": err,
            }
            self._send_json(200, {"cfg": cfg, "state": state, "payload": payload})
            return

        if parsed.path == "/shadow-war/realtime/stream":
            q = parse_qs(parsed.query or "")
            url = (q.get("url", [None])[0] or self.server.cfg_url).strip()
            try:
                limit = int(q.get("limit", ["0"])[0] or 0)
            except Exception:
                limit = 0
            no_proxy = self.server.cfg_no_proxy
            if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
                no_proxy = True
            refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}

            cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or self.server.cfg_cookie
            cookie = cookie.strip() if isinstance(cookie, str) else None
            if self.headers.get("X-NinjaSage-Cookie") and cookie:
                with self.server.cfg_lock:
                    self.server.cfg_cookie = cookie

            def _apply_limit(payload: dict[str, Any]) -> dict[str, Any]:
                if not limit or limit <= 0:
                    return payload
                rows = payload.get("rows", [])
                return {**payload, "rows": rows[:limit], "count": len(rows[:limit])}

            try:
                self._send_sse_headers()
                self.wfile.write(b"retry: 2000\n\n")
                self.wfile.flush()

                last_seen: int | None = None
                interval = max(1, int(self.server.cfg_watch_s or 1))
                ping_s = 15
                next_update_at = time.monotonic()
                next_ping_at = time.monotonic() + ping_s

                if self.server.state is not None:
                    with self.server.state.lock:
                        updated_at = self.server.state.last_updated_at
                        ok = self.server.state.last_ok
                        err = self.server.state.last_error
                    last_seen = updated_at
                    if ok is not None:
                        self._write_sse_event("update", _dump_json(_apply_limit(ok), pretty=False))
                        next_ping_at = time.monotonic() + ping_s
                    elif err is not None:
                        self._write_sse_event("update", _dump_json(_error_payload(url, err), pretty=False))
                        next_ping_at = time.monotonic() + ping_s
                    elif refresh or updated_at is None:
                        try:
                            payload = _scrape_now(
                                url,
                                cookie=cookie,
                                timeout_s=self.server.cfg_timeout_s,
                                no_proxy=no_proxy,
                                limit=limit,
                            )
                            with self.server.state.lock:
                                self.server.state.last_ok = payload
                                self.server.state.last_error = None
                                self.server.state.last_updated_at = int(time.time())
                            self._write_sse_event("update", _dump_json(payload, pretty=False))
                            next_ping_at = time.monotonic() + ping_s
                        except Exception as e:
                            with self.server.state.lock:
                                self.server.state.last_error = str(e)
                                self.server.state.last_updated_at = int(time.time())
                            self._write_sse_event("update", _dump_json(_error_payload(url, str(e)), pretty=False))
                            next_ping_at = time.monotonic() + ping_s
                elif refresh or self.server.state is None:
                    try:
                        payload = _scrape_now(
                            url,
                            cookie=cookie,
                            timeout_s=self.server.cfg_timeout_s,
                            no_proxy=no_proxy,
                            limit=limit,
                        )
                        if self.server.state:
                            with self.server.state.lock:
                                self.server.state.last_ok = payload
                                self.server.state.last_error = None
                                self.server.state.last_updated_at = int(time.time())
                        self._write_sse_event("update", _dump_json(payload, pretty=False))
                        next_ping_at = time.monotonic() + ping_s
                    except Exception as e:
                        self._write_sse_event("update", _dump_json(_error_payload(url, str(e)), pretty=False))
                        next_ping_at = time.monotonic() + ping_s

                while True:
                    now = time.monotonic()
                    if now >= next_update_at:
                        if self.server.state is not None:
                            with self.server.state.lock:
                                updated_at = self.server.state.last_updated_at
                                ok = self.server.state.last_ok
                                err = self.server.state.last_error
                            if updated_at is not None and updated_at != last_seen:
                                last_seen = updated_at
                                if ok is not None:
                                    self._write_sse_event("update", _dump_json(_apply_limit(ok), pretty=False))
                                    next_ping_at = time.monotonic() + ping_s
                                elif err is not None:
                                    self._write_sse_event("update", _dump_json(_error_payload(url, err), pretty=False))
                                    next_ping_at = time.monotonic() + ping_s
                        else:
                            try:
                                payload = _scrape_now(
                                    url,
                                    cookie=cookie,
                                    timeout_s=self.server.cfg_timeout_s,
                                    no_proxy=no_proxy,
                                    limit=limit,
                                )
                                self._write_sse_event("update", _dump_json(payload, pretty=False))
                                next_ping_at = time.monotonic() + ping_s
                            except Exception as e:
                                self._write_sse_event("update", _dump_json(_error_payload(url, str(e)), pretty=False))
                                next_ping_at = time.monotonic() + ping_s
                        next_update_at = time.monotonic() + interval

                    now = time.monotonic()
                    if now >= next_ping_at:
                        self._write_sse_comment("ping")
                        next_ping_at = time.monotonic() + ping_s

                    sleep_s = max(0.05, min(next_update_at, next_ping_at) - time.monotonic())
                    time.sleep(sleep_s)
            except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
                return
            except Exception as e:
                try:
                    self._write_sse_event("update", _dump_json(_error_payload(url, str(e)), pretty=False))
                except Exception:
                    pass
                return

        if parsed.path == "/shadow-war/predict":
            q = parse_qs(parsed.query or "")
            url = (q.get("url", [None])[0] or self.server.cfg_url).strip()
            try:
                limit = int(q.get("limit", ["0"])[0] or 0)
            except Exception:
                limit = 0
            limit = max(0, int(limit))
            no_proxy = self.server.cfg_no_proxy
            if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
                no_proxy = True
            refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}

            cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or self.server.cfg_cookie
            cookie = cookie.strip() if isinstance(cookie, str) else None
            if self.headers.get("X-NinjaSage-Cookie") and cookie:
                with self.server.cfg_lock:
                    self.server.cfg_cookie = cookie

            if self.server.state and self.server.cfg_watch_s and not refresh:
                with self.server.state.lock:
                    if self.server.state.last_ok is not None:
                        payload = self.server.state.last_ok
                        if limit and limit > 0:
                            payload = {**payload, "rows": payload.get("rows", [])[:limit]}
                            payload["count"] = len(payload["rows"])
                        self._send_json(200, _predict_shadow_war(payload))
                        return
                    if self.server.state.last_error is not None:
                        self._send_json(
                            502,
                            _predict_shadow_war(_error_payload(url, self.server.state.last_error)),
                        )
                        return

            try:
                payload = _scrape_now(
                    url,
                    cookie=cookie,
                    timeout_s=self.server.cfg_timeout_s,
                    no_proxy=no_proxy,
                    limit=limit,
                )
                if self.server.state:
                    with self.server.state.lock:
                        self.server.state.last_ok = payload
                        self.server.state.last_error = None
                        self.server.state.last_updated_at = int(time.time())
                self._send_json(200, _predict_shadow_war(payload))
            except Exception as e:
                msg = str(e)
                if self.server.state:
                    with self.server.state.lock:
                        self.server.state.last_error = msg
                        self.server.state.last_updated_at = int(time.time())
                self._send_json(502, _predict_shadow_war(_error_payload(url, msg)))
            return

        if parsed.path in {"/shadow-war/ai", "/shadow-war/ai/explain"}:
            q = parse_qs(parsed.query or "")
            url = (q.get("url", [None])[0] or self.server.cfg_url).strip()
            try:
                limit = int(q.get("limit", ["0"])[0] or 0)
            except Exception:
                limit = 0
            limit = max(0, int(limit))
            no_proxy = self.server.cfg_no_proxy
            if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
                no_proxy = True
            refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}

            player_key = (q.get("player_id", [None])[0] or q.get("player_key", [None])[0] or q.get("cid", [None])[0] or None)
            player_key = player_key.strip() if isinstance(player_key, str) else None
            name_q = (q.get("name", [None])[0] or None)
            name_q = name_q.strip() if isinstance(name_q, str) else None
            try:
                rank_i = int((q.get("rank", [None])[0] or "").strip()) if q.get("rank") else None
            except Exception:
                rank_i = None
            try:
                burn_min = int((q.get("burn_min", ["0"])[0] or "0").strip())
            except Exception:
                burn_min = 0
            try:
                idle_min = int((q.get("idle_min", ["0"])[0] or "0").strip())
            except Exception:
                idle_min = 0
            burn_min = max(0, min(240, int(burn_min)))
            idle_min = max(0, min(240, int(idle_min)))

            cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or self.server.cfg_cookie
            cookie = cookie.strip() if isinstance(cookie, str) else None
            if self.headers.get("X-NinjaSage-Cookie") and cookie:
                with self.server.cfg_lock:
                    self.server.cfg_cookie = cookie

            payload: dict[str, Any] | None = None
            if self.server.state and self.server.cfg_watch_s and not refresh:
                with self.server.state.lock:
                    if self.server.state.last_ok is not None:
                        payload = self.server.state.last_ok
                    elif self.server.state.last_error is not None:
                        self._send_json(502, {"error": "upstream_error", "message": str(self.server.state.last_error)})
                        return

            if payload is None:
                try:
                    payload = _scrape_now(
                        url,
                        cookie=cookie,
                        timeout_s=self.server.cfg_timeout_s,
                        no_proxy=no_proxy,
                        limit=limit,
                    )
                    if self.server.state:
                        with self.server.state.lock:
                            self.server.state.last_ok = payload
                            self.server.state.last_error = None
                            self.server.state.last_updated_at = int(time.time())
                except Exception as e:
                    msg = str(e)
                    if self.server.state:
                        with self.server.state.lock:
                            self.server.state.last_error = msg
                            self.server.state.last_updated_at = int(time.time())
                    self._send_json(502, {"error": "upstream_error", "message": msg})
                    return

            if not isinstance(payload, dict):
                self._send_json(502, {"error": "bad_payload"})
                return
            if limit and limit > 0:
                rows = payload.get("rows", [])
                payload = {**payload, "rows": rows[:limit], "count": len(rows[:limit])}

            output, explain = sw_ai_command_system(
                payload,
                player_key=player_key,
                rank_i=rank_i,
                name_q=name_q,
                burn_min=burn_min,
                idle_min=idle_min,
            )
            if parsed.path.endswith("/explain"):
                self._send_json(200, {"output": output, "explain": explain})
                return
            self._send_json(200, output)
            return

        if parsed.path != "/shadow-war/realtime":
            self._send_json(404, {"error": "not_found"})
            return

        q = parse_qs(parsed.query or "")
        url = (q.get("url", [None])[0] or self.server.cfg_url).strip()
        limit = int(q.get("limit", ["0"])[0] or 0)
        no_proxy = self.server.cfg_no_proxy
        if (q.get("no_proxy", [""])[0] or "").strip() in {"1", "true", "yes"}:
            no_proxy = True
        refresh = (q.get("refresh", [""])[0] or "").strip() in {"1", "true", "yes"}

        cookie = self.headers.get("X-NinjaSage-Cookie") or self.headers.get("Cookie") or self.server.cfg_cookie
        cookie = cookie.strip() if isinstance(cookie, str) else None
        if self.headers.get("X-NinjaSage-Cookie") and cookie:
            with self.server.cfg_lock:
                self.server.cfg_cookie = cookie

        if self.server.state and self.server.cfg_watch_s and not refresh:
            with self.server.state.lock:
                if self.server.state.last_ok is not None:
                    payload = self.server.state.last_ok
                    if limit and limit > 0:
                        payload = {**payload, "rows": payload.get("rows", [])[:limit]}
                        payload["count"] = len(payload["rows"])
                    self._send_json(200, payload)
                    return
                if self.server.state.last_error is not None:
                    self._send_json(
                        502,
                        _error_payload(url, self.server.state.last_error),
                    )
                    return

        try:
            payload = _scrape_now(
                url,
                cookie=cookie,
                timeout_s=self.server.cfg_timeout_s,
                no_proxy=no_proxy,
                limit=limit,
            )
            if self.server.state:
                with self.server.state.lock:
                    self.server.state.last_ok = payload
                    self.server.state.last_error = None
                    self.server.state.last_updated_at = int(time.time())
            self._send_json(200, payload)
        except Exception as e:
            msg = str(e)
            if self.server.state:
                with self.server.state.lock:
                    self.server.state.last_error = msg
                    self.server.state.last_updated_at = int(time.time())
            self._send_json(502, _error_payload(url, msg))

    def log_message(self, fmt: str, *args: Any) -> None:
        return

    def handle_one_request(self) -> None:
        try:
            super().handle_one_request()
        except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
            self.close_connection = True
            return


class _APIServer(ThreadingHTTPServer):
    def __init__(
        self,
        host: str,
        port: int,
        url: str,
        cookie: str | None,
        timeout_s: int,
        no_proxy: bool,
        watch_s: int,
    ) -> None:
        super().__init__((host, port), _APIHandler)
        self.cfg_lock = threading.Lock()
        self.cfg_url = url
        self.cfg_cookie = cookie
        self.cfg_timeout_s = timeout_s
        self.cfg_no_proxy = no_proxy
        self.cfg_watch_s = watch_s
        self.cfg_ui_popup_side = "left"
        self.cfg_ui_popup_auto_open = True
        self.state = _ServerState()
        self.admin_password = (os.getenv("SW_ADMIN_PASSWORD") or "").strip() or None
        self.admin_cookie_name = "sw_admin_session"
        jwt_secret_env = (os.getenv("SW_ADMIN_JWT_SECRET") or "").strip()
        self.admin_jwt_secret = jwt_secret_env.encode("utf-8") if jwt_secret_env else secrets.token_bytes(32)
        self.admin_jwt_cookie_name = "sw_admin_jwt"
        try:
            self.admin_jwt_ttl_s = int((os.getenv("SW_ADMIN_JWT_TTL_S") or "").strip() or 12 * 3600)
        except Exception:
            self.admin_jwt_ttl_s = 12 * 3600
        self.admin_lock = threading.Lock()
        self.admin_sessions: dict[str, int] = {}
        self._watch_stop = threading.Event()
        self._watch_thread: threading.Thread | None = None

    def ensure_watcher_running(self) -> None:
        if self._watch_thread and self._watch_thread.is_alive():
            return
        self._watch_thread = threading.Thread(target=_watch_loop, args=(self,), daemon=True)
        self._watch_thread.start()


def _watch_loop(server: _APIServer) -> None:
    while not server._watch_stop.is_set():
        with server.cfg_lock:
            url = server.cfg_url
            cookie = server.cfg_cookie
            timeout_s = server.cfg_timeout_s
            no_proxy = server.cfg_no_proxy
            watch_s = int(server.cfg_watch_s or 0)

        if not watch_s or watch_s <= 0:
            server._watch_stop.wait(0.5)
            continue

        try:
            payload = _scrape_now(
                url,
                cookie=cookie,
                timeout_s=timeout_s,
                no_proxy=no_proxy,
                limit=0,
            )
            with server.state.lock:
                server.state.last_ok = payload
                server.state.last_error = None
                server.state.last_updated_at = int(time.time())
            _sw_ai_process_tracked(payload)
        except Exception as e:
            with server.state.lock:
                server.state.last_error = str(e)
                server.state.last_updated_at = int(time.time())

        server._watch_stop.wait(max(1, int(watch_s)))


def main() -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--url",
        default="https://ninjasage.id/en/leaderboards/shadow-war/realtime",
    )
    ap.add_argument("--ws-url", default=None)
    ap.add_argument("--ws-origin", default="https://ninjasage.id")
    ap.add_argument("--ws-max-packets", type=int, default=0)
    ap.add_argument("--ws-max-seconds", type=int, default=0)
    ap.add_argument("--ws-out", default=None)
    ap.add_argument("--cookie", default=None)
    ap.add_argument("--timeout", type=int, default=30)
    ap.add_argument("--limit", type=int, default=0)
    ap.add_argument("--html-file", default=None)
    ap.add_argument("--stdin", action="store_true")
    ap.add_argument("--no-proxy", action="store_true")
    ap.add_argument("--out", default=None)
    ap.add_argument("--watch", type=int, default=0)
    ap.add_argument("--serve", action="store_true")
    ap.add_argument("--self-test", action="store_true")
    ap.add_argument("--host", default="127.0.0.1")
    ap.add_argument("--port", type=int, default=8787)
    args = ap.parse_args()

    if args.self_test:
        return _sw_ai_run_self_test()

    if args.ws_url:
        out_path = Path(args.ws_out) if args.ws_out else None
        origin = (args.ws_origin or "").strip() or None
        return socketio_listen(
            args.ws_url,
            origin=origin,
            timeout_s=int(args.timeout),
            max_packets=int(args.ws_max_packets or 0),
            max_seconds=int(args.ws_max_seconds or 0),
            out=out_path,
        )

    html: str | None = None
    if args.html_file:
        html = Path(args.html_file).read_text(encoding="utf-8", errors="replace")
    elif args.stdin:
        html = sys.stdin.read()

    if args.serve:
        if html is not None:
            print("mode --serve tidak bisa digabung dengan --stdin/--html-file", file=sys.stderr)
            return 2
        server = _APIServer(
            host=args.host,
            port=int(args.port),
            url=args.url,
            cookie=args.cookie,
            timeout_s=int(args.timeout),
            no_proxy=bool(args.no_proxy),
            watch_s=int(args.watch),
        )
        server.ensure_watcher_running()
        server.serve_forever()
        return 0

    out_path = Path(args.out) if args.out else None
    watch_s = int(args.watch or 0)

    while True:
        try:
            if html is None:
                result = _scrape_now(
                    args.url,
                    cookie=args.cookie,
                    timeout_s=int(args.timeout),
                    no_proxy=bool(args.no_proxy),
                    limit=int(args.limit),
                )
            else:
                result = _scrape_from_html(args.url, html)
                if args.limit and args.limit > 0:
                    result["rows"] = result["rows"][: args.limit]
                    result["count"] = len(result["rows"])

            text = _dump_json(result, pretty=True)
            if out_path:
                _write_text_atomic(out_path, text + "\n")
            else:
                print(text)
        except Exception as e:
            msg = str(e)
            if out_path:
                _write_text_atomic(out_path, _dump_json(_error_payload(args.url, msg), pretty=True) + "\n")
            else:
                print(_dump_json(_error_payload(args.url, msg), pretty=True), file=sys.stderr)

        if not watch_s or watch_s <= 0:
            break
        time.sleep(max(1, watch_s))
    return 0


def _sw_ai_run_self_test() -> int:
    import unittest

    class _SWAITest(unittest.TestCase):
        def test_schema_and_commands(self) -> None:
            now_i = 1700000000
            payload = {
                "url": "test",
                "fetched_at": now_i,
                "season_remaining_s": 3600,
                "rows": [
                    {"rank": 1, "cid": "p1", "name": "Leader", "trophy": 120000},
                    {"rank": 2, "cid": "p2", "name": "Runner", "trophy": 118000, "gap": 2000},
                    {"rank": 3, "cid": "p3", "name": "Chaser", "trophy": 117500, "gap": 500},
                ],
            }
            _enrich_payload_rates(payload)
            out, explain = sw_ai_command_system(payload, player_key="p2", rank_i=None, name_q=None, burn_min=10, idle_min=0)
            self.assertIsInstance(out, dict)
            for k in ["scenario", "command", "eta", "success_rate", "risk", "confidence", "timeline"]:
                self.assertIn(k, out)
            self.assertIn(out["command"], {"BURN NOW", "HOLD", "DELAY BURN", "SECURE", "ABORT"})
            self.assertIn(out["scenario"], {"IDLE", "NORMAL", "BURN", "REACTIVE_BURN", "COUNTER_BURN", "SECURE_RANK", "LAST_HOUR_PUSH"})
            self.assertIsInstance(out["eta"], dict)
            self.assertIn("burn", out["eta"])
            self.assertIn("normal", out["eta"])
            self.assertTrue(isinstance(out["eta"]["burn"], str) and ":" in out["eta"]["burn"])
            self.assertTrue(isinstance(out["eta"]["normal"], str) and ":" in out["eta"]["normal"])
            self.assertIsInstance(explain, dict)

        def test_eta_math(self) -> None:
            now_i = 1700000000
            payload = {
                "url": "test",
                "fetched_at": now_i,
                "season_remaining_s": 3600,
                "rows": [
                    {"rank": 1, "cid": "p1", "name": "Leader", "trophy": 10000},
                    {"rank": 2, "cid": "p2", "name": "Runner", "trophy": 9000, "gap": 1000},
                ],
            }
            _enrich_payload_rates(payload)
            with _RATE_LOCK:
                _RATE_STATE["cid:p2"]["ema_per_hr"] = 1000.0
                _RATE_STATE["cid:p2"]["last_rate"] = 1000.0
                _RATE_STATE["cid:p2"]["last_rate_at"] = now_i
                _RATE_STATE["cid:p1"]["ema_per_hr"] = 800.0
                _RATE_STATE["cid:p1"]["last_rate"] = 800.0
                _RATE_STATE["cid:p1"]["last_rate_at"] = now_i
            _enrich_payload_rates(payload)

            out, _ = sw_ai_command_system(payload, player_key="p2", rank_i=None, name_q=None, burn_min=0, idle_min=0)
            self.assertEqual(out["eta"]["normal"], "01:00")

    suite = unittest.defaultTestLoader.loadTestsFromTestCase(_SWAITest)
    res = unittest.TextTestRunner(verbosity=2).run(suite)
    return 0 if res.wasSuccessful() else 1


if __name__ == "__main__":
    raise SystemExit(main())
