#!/usr/bin/env python3 """ Standalone shell harness for the surveillance-recording lookup. Mirrors what server/api/surveillance/shipment-recording/init.post.ts does but talks to Synology directly so we can verify the timezone shift and the listing window without going through the Nuxt request stack. Usage: python3 scripts/test-synology-recording.py --ts 2026-05-03T08:04:42Z python3 scripts/test-synology-recording.py --ts 2026-05-03T08:04:42Z --pre 120 --post 60 python3 scripts/test-synology-recording.py --ts 2026-05-03T08:04:42Z --no-shift python3 scripts/test-synology-recording.py --ts 2026-05-03T08:04:42Z --download /tmp/clip.mp4 Env (reads .env / .env-prod from the repo root automatically): SYNOLOGY_URL, SYNOLOGY_USER, SYNOLOGY_PASSWORD, SYNOLOGY_CAMERA_IDS Stdlib-only — no `requests`, no `python-dotenv`, no virtualenv needed. """ from __future__ import annotations import argparse import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlencode from urllib.request import urlopen try: # 3.9+ — use IANA tz database for Berlin / DST without hard-coding rules from zoneinfo import ZoneInfo except ImportError: # pragma: no cover print("Python 3.9+ is required (zoneinfo).", file=sys.stderr) sys.exit(2) REPO_ROOT = Path(__file__).resolve().parent.parent # --------- inline credentials ------------------------------------------ # Hard-coded defaults so the script runs out of the box without sourcing # .env. A real shell env var (e.g. `SYNOLOGY_URL=... python3 ...`) still # wins, since these are only applied via setdefault. SYNOLOGY_URL = "http://192.168.1.56:5000" SYNOLOGY_USER = "adminyk" SYNOLOGY_PASSWORD = "X63Yipmz??B^w" SYNOLOGY_CAMERA_IDS = "2" SYNOLOGY_PRE_SECONDS = "120" SYNOLOGY_POST_SECONDS = "60" os.environ.setdefault("SYNOLOGY_URL", SYNOLOGY_URL) os.environ.setdefault("SYNOLOGY_USER", SYNOLOGY_USER) os.environ.setdefault("SYNOLOGY_PASSWORD", SYNOLOGY_PASSWORD) os.environ.setdefault("SYNOLOGY_CAMERA_IDS", SYNOLOGY_CAMERA_IDS) os.environ.setdefault("SYNOLOGY_PRE_SECONDS", SYNOLOGY_PRE_SECONDS) os.environ.setdefault("SYNOLOGY_POST_SECONDS", SYNOLOGY_POST_SECONDS) # --------- arg parsing ------------------------------------------------- def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Probe Synology Surveillance Station for a shipment's commissioning recording." ) src = p.add_mutually_exclusive_group(required=True) src.add_argument("--ts", "--anchor", dest="ts", help='iDempiere "Z" timestamp (real UTC), e.g. 2026-05-03T08:04:42Z. ' 'Use this for the shipping_date path.') src.add_argument("--activities", metavar="FILE", help='Path to a JSON file containing the response from ' 'GET /models/cust_commissionactivity?$filter=M_InOut_ID eq ' '(or just `{"records":[…]}` / a bare array). The script will ' 'pick the latest session_start.ActionTime as anchor — same as ' 'the production resolveWindow().') p.add_argument("--pre", type=int, default=int(os.environ.get("SYNOLOGY_PRE_SECONDS") or 120), help="seconds before anchor (default: SYNOLOGY_PRE_SECONDS or 120)") p.add_argument("--post", type=int, default=int(os.environ.get("SYNOLOGY_POST_SECONDS") or 60), help="seconds after anchor (default: SYNOLOGY_POST_SECONDS or 60)") p.add_argument("--no-shift", action="store_true", help="skip the Berlin-offset shift (raw UTC mode — for A/B testing)") p.add_argument("--cams", default=os.environ.get("SYNOLOGY_CAMERA_IDS") or "2", help="comma-separated camera IDs") p.add_argument("--download", metavar="PATH", default=None, help="if set, download the sliced clip to this path") return p.parse_args() # --------- activity helpers ---------------------------------------------- def load_activities(path: str) -> list[dict]: """Read the activity JSON. Accepts {records:[…]}, a bare list, or a single record. Strips iDempiere's wrapper fields if present.""" with open(path, "r", encoding="utf-8") as fh: body = json.load(fh) if isinstance(body, dict) and isinstance(body.get("records"), list): return body["records"] if isinstance(body, list): return body if isinstance(body, dict) and body.get("ActionType"): return [body] raise ValueError( f"Couldn't recognise activity JSON shape in {path}. " "Expected {records:[…]} or a list." ) def resolve_window_from_activities(records: list[dict]) -> dict: """Mirror server/api/surveillance/shipment-recording/init.post.ts resolveWindow's activity branch: pick the latest session_start (or session_complete) ActionTime as anchor. IMPORTANT: anchor on `ActionTime`, NOT `Created`. Created is Berlin local clock time stamped with a misleading `Z` suffix; parsing it as UTC and adding the Berlin offset double-shifts the anchor 1-2 h past every recording. ActionTime is real UTC and lines up with the same parse_ts that handles shipping_date. """ def latest_of(action_type: str) -> dict | None: candidates = [r for r in records if r.get("ActionType") == action_type and isinstance(r.get("ActionTime"), str)] if not candidates: return None return max(candidates, key=lambda r: r["ActionTime"]) start = latest_of("session_start") complete = latest_of("session_complete") anchor = start or complete if not anchor: raise ValueError( "No session_start or session_complete with ActionTime found in activities." ) return { "anchor_record": anchor, "action_time": anchor["ActionTime"], "source": "activity-start" if start else "activity-complete", "all_starts": [r for r in records if r.get("ActionType") == "session_start"], "all_completes": [r for r in records if r.get("ActionType") == "session_complete"], } def dump_activity_summary(records: list[dict]) -> None: """Print Created vs ActionTime per record so the Berlin-local lie in Created is visible at a glance.""" print(f" {len(records)} activity record(s) loaded") print(f" {'ActionType':<22} {'Created (Berlin local!)':<26} {'ActionTime (real UTC)':<26} {'Δ'}") for r in sorted(records, key=lambda x: x.get("ActionTime") or ""): atype = r.get("ActionType", "?") created = r.get("Created") or "—" atime = r.get("ActionTime") or "—" delta = "?" try: tc = datetime.fromisoformat(created.replace("Z", "+00:00")).timestamp() ta = datetime.fromisoformat(atime.replace("Z", "+00:00")).timestamp() delta = f"{int(tc - ta):+d}s" except Exception: # noqa: BLE001 pass print(f" {atype:<22} {created:<26} {atime:<26} {delta}") # --------- timezone helpers (must match init.post.ts) ------------------ def berlin_offset_seconds(unix_seconds: int) -> int: """Berlin's UTC offset at the given moment, in seconds (+7200 CEST, +3600 CET). Uses the IANA tz database so DST is handled automatically.""" moment = datetime.fromtimestamp(unix_seconds, tz=timezone.utc) offset = moment.astimezone(ZoneInfo("Europe/Berlin")).utcoffset() return int(offset.total_seconds()) if offset else 0 def parse_ts(s: str, *, shift: bool) -> int: """Parse an iDempiere `Z` timestamp into seconds. iDempiere stores Created/ActionTime/shipping_date as actual UTC. Synology Surveillance Station, however, labels recordings using Berlin local time formatted as if it were UTC. To make moments line up with what Recording.List returns, shift forward by Berlin's offset for that instant (+1h CET / +2h CEST). """ # Accept "...Z" and offset-aware forms alike. iso = s.replace("Z", "+00:00") if s.endswith("Z") else s raw = int(datetime.fromisoformat(iso).timestamp()) return raw if not shift else raw + berlin_offset_seconds(raw) def fmt_utc(ts: int) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def fmt_berlin(ts: int) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone( ZoneInfo("Europe/Berlin") ).strftime("%Y-%m-%d %H:%M:%S %Z") # --------- Synology calls ---------------------------------------------- class Syno: def __init__(self, url: str, user: str, password: str) -> None: self.url = url.rstrip("/") self.user = user self.password = password self.sid: str | None = None def entry(self, params: dict) -> str: return f"{self.url}/webapi/entry.cgi?{urlencode({k: str(v) for k, v in params.items()})}" def _get_json(self, target: str) -> dict | None: try: with urlopen(target, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) except Exception as exc: # noqa: BLE001 print(f" ! request failed: {exc}", file=sys.stderr) return None def _get_with_raw(self, target: str) -> tuple[int | None, str, dict | None]: """Like _get_json but also returns HTTP status + the raw response text — used by list_recordings so we can dump exactly what Surveillance Station gave us back, byte-for-byte.""" try: with urlopen(target, timeout=30) as resp: status = resp.status raw = resp.read().decode("utf-8", errors="replace") except Exception as exc: # noqa: BLE001 return None, f"", None try: body = json.loads(raw) except Exception: # noqa: BLE001 body = None return status, raw, body @staticmethod def _mask_url(target: str) -> str: """Hide the _sid query param so the printed URL is safe to share.""" return re.sub(r"(_sid=)[^&]+", r"\1***", target) def login(self) -> str: target = self.entry({ "api": "SYNO.API.Auth", "method": "login", "version": 7, "account": self.user, "passwd": self.password, "session": "SurveillanceStation", "format": "sid", }) body = self._get_json(target) or {} if not body.get("success") or not body.get("data", {}).get("sid"): raise RuntimeError(f"Login failed: {body.get('error') or body}") self.sid = body["data"]["sid"] return self.sid def logout(self) -> None: if not self.sid: return target = self.entry({ "api": "SYNO.API.Auth", "method": "logout", "version": 7, "_sid": self.sid, "session": "SurveillanceStation", }) try: urlopen(target, timeout=10).read() except Exception: # noqa: BLE001 pass @staticmethod def _parse_start_from_filepath(path: str | None) -> int | None: """Some DSM builds omit startTime and embed it as a 13-digit ms number in the filename instead, e.g. Kommissionierung-1-20260503-002000-1777760400027-7.mp4.""" if not path: return None m = re.search(r"([0-9]{13})-[0-9]+\.mp4$", path) if not m: return None ms = int(m.group(1)) return ms // 1000 if ms > 0 else None def list_recordings(self, camera_ids: list[int], from_ts: int, to_ts: int, *, verbose: bool = True) -> list[dict]: """List recordings for the window. With verbose=True the URL, HTTP status, and raw JSON body of every attempt are printed so the response shape can be cross-checked against what the live Nuxt helper (`synologyHelper.ts`) parses.""" cam_csv = ",".join(str(c) for c in camera_ids) # Try the parameter formats most SS builds accept. Last attempt # is unfiltered + client-side filter, matching the live helper. attempts = [{"cameraIds": cam_csv}, {"cameraIds": f"[{cam_csv}]"}, {}] cam_set = set(camera_ids) for idx, extra in enumerate(attempts, start=1): params = { "api": "SYNO.SurveillanceStation.Recording", "method": "List", "version": 6, "fromTime": from_ts, "toTime": to_ts, "_sid": self.sid, **extra, } target = self.entry(params) label = ( f"cameraIds={cam_csv!r}" if extra.get("cameraIds") == cam_csv else f"cameraIds=[{cam_csv}]" if extra.get("cameraIds") == f"[{cam_csv}]" else "unfiltered (no cameraIds param)" ) if verbose: print(f"--- List attempt {idx}: {label} ---") print(f" URL: {self._mask_url(target)}") status, raw, body = self._get_with_raw(target) if verbose: print(f" HTTP: {status}") # Pretty-print the JSON body (up to ~6 KB) so the field # casing of the first few items is visible. If it's huge, # also collapse the recordings array after the first 2 # items so the output stays readable. if body is None: print(" RAW: " + (raw[:2000] + ("…" if len(raw) > 2000 else ""))) else: sample = body data = sample.get("data") if isinstance(sample, dict) else None if isinstance(data, dict): for key in ("recordings", "events"): arr = data.get(key) if isinstance(arr, list) and len(arr) > 2: # Replace tail with a marker so the dump # shows shape but isn't 100s of lines. data[key] = arr[:2] + [f"… (+{len(arr) - 2} more)"] pretty = json.dumps(sample, indent=2, ensure_ascii=False) if len(pretty) > 6000: pretty = pretty[:6000] + "\n …(truncated)…" indented = "\n".join(" " + ln for ln in pretty.splitlines()) print(" BODY:") print(indented) if isinstance(data, dict): print(f" data keys: {sorted(data.keys())}") if isinstance(body.get("data", {}).get("recordings"), list): real_total = ( len(body["data"]["recordings"]) if isinstance(body["data"]["recordings"], list) else "?" ) # The mutated copy above won't reflect real # total — re-fetch length from raw so we # always print the truth. try: real = json.loads(raw) rlen = len(real.get("data", {}).get("recordings") or []) elen = len(real.get("data", {}).get("events") or []) print(f" recordings: {rlen} events: {elen}") except Exception: # noqa: BLE001 pass if not body or not body.get("success"): if verbose: print(f" → skipped (success={bool(body and body.get('success'))})") continue # Re-parse from raw text so the verbose-mode mutation above # never leaks into the data we actually return. try: fresh = json.loads(raw) if raw else {} except Exception: # noqa: BLE001 fresh = body items = ( fresh.get("data", {}).get("recordings") or fresh.get("data", {}).get("events") or [] ) normalised: list[dict] = [] for r in items: cam_id = r.get("camera_id") if "camera_id" in r else r.get("cameraId") if not extra and cam_id is not None and int(cam_id) not in cam_set: continue file_path = r.get("filePath") or r.get("file_path") start_time = ( r.get("startTime") or r.get("start_time") or r.get("recordingTime") or r.get("recordTime") or self._parse_start_from_filepath(file_path) ) normalised.append({ "id": r.get("id") or r.get("recordId"), "cameraId": cam_id, "startTime": start_time, "filePath": file_path, "raw": r, }) if verbose: print(f" → normalised {len(normalised)} item(s) (used this attempt)") if normalised: sample = normalised[0] print(" first normalised item:") for k in ("id", "cameraId", "startTime", "filePath"): print(f" {k}: {sample.get(k)}") print(f" raw keys: {sorted(sample.get('raw', {}).keys())}") print() return normalised if verbose: print("All List attempts failed or returned success=false.") return [] # ---- diagnostic probes ------------------------------------------ def api_info(self, query: str = "all") -> dict | None: """Enumerate available APIs / max versions on the NAS — tells us which Recording / Camera methods we can actually call.""" target = self.entry({ "api": "SYNO.API.Info", "method": "Query", "version": 1, "query": query, "_sid": self.sid, }) return self._get_json(target) def get_recording_info(self, recording_id, version: int = 6) -> tuple[int | None, str, dict | None]: """Try to pull rich metadata for a single recording id. On many DSM builds this returns startTime, recordingTime, eventVideoLength, size, etc. — the fields List omits.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "GetInfo", "version": version, "id": str(recording_id), "_sid": self.sid, }) return self._get_with_raw(target) def get_camera_info(self, camera_id, version: int = 8) -> tuple[int | None, str, dict | None]: """Pull camera config — recordSchedule / recordingType / etc. tells us whether the camera is on continuous, motion, or event.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Camera", "method": "GetInfo", "version": version, "cameraIds": str(camera_id), "_sid": self.sid, }) return self._get_with_raw(target) def list_recordings_version(self, camera_ids: list[int], from_ts: int, to_ts: int, version: int) -> tuple[int | None, str, dict | None]: """Probe Recording.List at an arbitrary version — DSM 7+ may add startTime/duration fields to the items at version 7 or 8.""" cam_csv = ",".join(str(c) for c in camera_ids) target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "List", "version": version, "fromTime": from_ts, "toTime": to_ts, "cameraIds": cam_csv, "_sid": self.sid, }) return self._get_with_raw(target) def list_recordings_unfiltered(self, from_ts: int, to_ts: int) -> tuple[int | None, str, dict | None]: """Recording.List with NO cameraIds filter — returns recordings for every camera. Useful when the GUI shows footage 'on cam X' but the filtered call returns 0; lets us see whether the chunk is actually attributed to a different camera_id internally.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "List", "version": 6, "fromTime": from_ts, "toTime": to_ts, "_sid": self.sid, }) return self._get_with_raw(target) def camera_list(self, version: int = 9) -> tuple[int | None, str, dict | None]: """Enumerate cameras (id ↔ name mapping). The GUI shows names; the API needs IDs. If the names diverge from what the user expects (e.g. there are two physical cams under one logical name) this is where it'll show up.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Camera", "method": "List", "version": version, "_sid": self.sid, }) return self._get_with_raw(target) def api_info_all(self) -> tuple[int | None, str, dict | None]: """Enumerate every API the NAS exposes. Block (b) probes only Recording/Camera/Event/RecordingPlayback by name; this returns the full surface so we can spot Save/Export/Stream/etc.""" target = self.entry({ "api": "SYNO.API.Info", "method": "Query", "version": 1, "query": "all", "_sid": self.sid, }) return self._get_with_raw(target) def range_export_start(self, cam_id: int, from_ts: int, to_ts: int, file_name: str = "video") -> tuple[int | None, str, dict | None]: """Step 1 of the documented Recording.RangeExport flow. Returns a `dlid` we then poll for progress and finally download from.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "RangeExport", "version": 6, "camId": cam_id, "fromTime": from_ts, "toTime": to_ts, "fileName": file_name, "_sid": self.sid, }) return self._get_with_raw(target) def range_export_progress(self, dlid: int) -> tuple[int | None, str, dict | None]: """Step 2: poll progress (must call at least every 20 s to keep the export task alive). progress=-1 means the export failed. progress=100 plus fileExt='mp4'|'zip' means it's ready.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "GetRangeExportProgress", "version": 6, "dlid": dlid, "_sid": self.sid, }) return self._get_with_raw(target) def range_export_download(self, dlid: int, out_path: str, file_name: str = "video") -> int: """Step 3: download the merged file. Must be called within 1 min of progress=100 or Synology cleans the temp file.""" target = self.entry({ "api": "SYNO.SurveillanceStation.Recording", "method": "OnRangeExportDone", "version": 6, "dlid": dlid, "fileName": file_name, "_sid": self.sid, }) written = 0 with urlopen(target, timeout=300) as resp, open(out_path, "wb") as fh: while True: chunk = resp.read(64 * 1024) if not chunk: break fh.write(chunk) written += len(chunk) return written def head_download(self, recording_id, version: int = 6) -> tuple[int | None, dict, int | None]: """Probe the Download URL with a Range: bytes=0-0 GET so we can read Content-Length and headers without pulling the whole file. Returns (status, headers_dict, content_length).""" from urllib.request import Request params = { "api": "SYNO.SurveillanceStation.Recording", "method": "Download", "version": version, "id": str(recording_id), "mountId": 0, "_sid": self.sid, } target = self.entry(params) req = Request(target, headers={"Range": "bytes=0-0"}) try: with urlopen(req, timeout=30) as resp: hdrs = {k.lower(): v for k, v in resp.headers.items()} # Content-Range looks like "bytes 0-0/12345678" → total bytes cr = hdrs.get("content-range", "") m = re.search(r"/(\d+)\s*$", cr) total = int(m.group(1)) if m else None if total is None: cl = hdrs.get("content-length") total = int(cl) if cl and cl.isdigit() else None return resp.status, hdrs, total except Exception as exc: # noqa: BLE001 return None, {"_error": str(exc)}, None def download(self, recording_id, out_path: str, *, offset_ms: int | None, play_ms: int | None) -> int: params: dict = { "api": "SYNO.SurveillanceStation.Recording", "method": "Download", "version": 6, "id": recording_id, "mountId": 0, "_sid": self.sid, } if offset_ms is not None: params["offsetTimeMs"] = offset_ms if play_ms is not None: params["playTimeMs"] = play_ms target = self.entry(params) bytes_written = 0 with urlopen(target, timeout=300) as resp, open(out_path, "wb") as fh: while True: chunk = resp.read(64 * 1024) if not chunk: break fh.write(chunk) bytes_written += len(chunk) return bytes_written # --------- main -------------------------------------------------------- def main() -> int: args = parse_args() shift = not args.no_shift url = (os.environ.get("SYNOLOGY_URL") or "").rstrip("/") user = os.environ.get("SYNOLOGY_USER") or "" password = os.environ.get("SYNOLOGY_PASSWORD") or "" if not url or not user or not password: print("Missing SYNOLOGY_URL / SYNOLOGY_USER / SYNOLOGY_PASSWORD in env.", file=sys.stderr) return 2 cameras = [int(c) for c in re.split(r"[,\s]+", args.cams.strip()) if c] # If --activities is given, derive the anchor from the JSON the same # way init.post.ts:resolveWindow does (latest session_start.ActionTime, # else session_complete.ActionTime). Otherwise use --ts directly. activity_resolution: dict | None = None if args.activities: try: records = load_activities(args.activities) except Exception as exc: # noqa: BLE001 print(f"Failed to load activities: {exc}", file=sys.stderr) return 2 try: activity_resolution = resolve_window_from_activities(records) except ValueError as exc: print(f"Couldn't resolve window: {exc}", file=sys.stderr) return 2 # Use ActionTime as the resolved anchor. ts_arg = activity_resolution["action_time"] print("=== Activity input ===") print(f" --activities {args.activities}") dump_activity_summary(records) print() print(f" Resolved source: {activity_resolution['source']}") print(f" Anchor record: id={activity_resolution['anchor_record'].get('id')} " f"({activity_resolution['anchor_record'].get('ActionType')})") print(f" → ActionTime: {ts_arg} ◀── used as anchor (real UTC)") created_for_anchor = activity_resolution['anchor_record'].get('Created') or '—' print(f" → Created: {created_for_anchor} (Berlin-local-with-Z, " "would be wrong if used)") # Show the broken-anchor an old (or copy-pasted) implementation # would have produced, so the difference is concrete. if isinstance(created_for_anchor, str) and created_for_anchor != "—": try: wrong_raw = int(datetime.fromisoformat( created_for_anchor.replace("Z", "+00:00") ).timestamp()) wrong_shifted = wrong_raw + (berlin_offset_seconds(wrong_raw) if shift else 0) right_raw = int(datetime.fromisoformat( ts_arg.replace("Z", "+00:00") ).timestamp()) right_shifted = right_raw + (berlin_offset_seconds(right_raw) if shift else 0) drift = wrong_shifted - right_shifted print(f" parseTs(Created) → {wrong_shifted} ({fmt_utc(wrong_shifted)})") print(f" parseTs(ActionTime)→ {right_shifted} ({fmt_utc(right_shifted)})") print(f" drift if Created had been used: {drift:+d}s " f"({drift/3600:+.1f}h) ← magnitude of the old bug") except Exception: # noqa: BLE001 pass print() else: ts_arg = args.ts raw_anchor = int(datetime.fromisoformat(ts_arg.replace("Z", "+00:00")).timestamp()) offset = berlin_offset_seconds(raw_anchor) anchor = parse_ts(ts_arg, shift=shift) from_ts = anchor - args.pre to_ts = anchor + args.post print("=== Input ===") if activity_resolution: print(f" source --activities ({activity_resolution['source']})") print(f" anchor (UTC) {ts_arg} (from session_*.ActionTime)") else: print(f" --ts {ts_arg}") print(f" pre/post {args.pre}s / {args.post}s") print(f" cameras {','.join(map(str, cameras))}") if shift: print(f" shift ON (+{offset}s = {offset / 3600:.0f}h Berlin)") else: print(" shift OFF (raw UTC)") print() print("=== Anchors ===") print(f" iDempiere \"Z\" {ts_arg} (parsed as UTC: {raw_anchor})") print(f" Berlin clock {fmt_berlin(raw_anchor)} (real Berlin local for that UTC moment)") print(f" Query anchor {anchor} ({fmt_utc(anchor)} interpreted as UTC)") print(f" Query window [{from_ts}, {to_ts}] → {fmt_utc(from_ts)} → {fmt_utc(to_ts)}") print() syno = Syno(url, user, password) sid = syno.login() print(f"✓ Logged in (sid {sid[:8]}…)") try: print() print("=== Narrow listing (fromTs/toTs as-is) ===") narrow = syno.list_recordings(cameras, from_ts, to_ts) print(f" {len(narrow)} hit(s)") for r in narrow: st = r["startTime"] print(f" id={r['id']} cam={r['cameraId']} start={st} ({fmt_utc(int(st)) if st else '?'}) file={r['filePath']}") # Cameras are motion-triggered, so chunks are short and start at # irregular times (not on 30-min boundaries). The chunk that # actually contains the anchor is the one with the LATEST start # among those that started before or at the anchor. Closest-by- # absolute-distance would prefer a chunk that started a few # seconds after the anchor and miss the moment. before = [r for r in narrow if r["startTime"] is not None and int(r["startTime"]) <= anchor] if before: chosen = max(before, key=lambda r: int(r["startTime"])) print() print("=== Chosen chunk (latest start ≤ anchor) ===") elif narrow: chosen = min(narrow, key=lambda r: int(r["startTime"]) if r["startTime"] else 10**12) print() print("=== Chosen chunk (no chunk starts before anchor — taking earliest forward) ===") else: chosen = None if chosen: st = int(chosen["startTime"]) print(f" id={chosen['id']} start={fmt_utc(st)} anchor is {anchor - st}s into chunk") slice_start = max(from_ts, st) offset_ms = max(0, (slice_start - st) * 1000) play_ms = max(1000, (to_ts - slice_start) * 1000) print(f" sliceStart={fmt_utc(slice_start)} offsetMs={offset_ms} playMs={play_ms}") # ============================================================ # DIAGNOSTICS — capture everything needed to settle whether # chunks are short motion fragments vs continuous recordings, # and which Synology APIs expose duration/end-time metadata. # The output of this block is what to paste back for analysis. # ============================================================ print() print("=" * 64) print("DIAGNOSTICS") print("=" * 64) # --- (a) Inferred chunk durations from gaps to next start -- print() print("--- (a) Inferred chunk durations from start-to-next-start gaps ---") print(" (Last chunk's duration is unknown without metadata.)") if narrow: ordered = sorted([r for r in narrow if r["startTime"] is not None], key=lambda r: int(r["startTime"])) for i, r in enumerate(ordered): st = int(r["startTime"]) if i + 1 < len(ordered): nxt = int(ordered[i + 1]["startTime"]) gap = nxt - st note = f"gap_to_next={gap}s" else: note = "gap_to_next=? (last in window)" contains_anchor = st <= anchor and ( i + 1 >= len(ordered) or int(ordered[i + 1]["startTime"]) > anchor ) marker = " ← contains anchor" if contains_anchor else "" print(f" id={r['id']} start={fmt_utc(st)} {note}{marker}") # --- (b) Which APIs / versions does the NAS expose? ---------- print() print("--- (b) SYNO.API.Info Query ---") info = syno.api_info( "SYNO.SurveillanceStation.Recording," "SYNO.SurveillanceStation.Camera," "SYNO.SurveillanceStation.RecordingPlayback," "SYNO.SurveillanceStation.Event" ) or {} info_pretty = json.dumps(info, indent=2, ensure_ascii=False) print("\n".join(" " + ln for ln in info_pretty.splitlines()[:60])) if len(info_pretty.splitlines()) > 60: print(" …(truncated)…") # --- (c) Camera.GetInfo: continuous vs motion vs event ------- print() print(f"--- (c) Camera.GetInfo for cam {cameras[0]} ---") for v in (8, 9, 7, 6): status, raw, body = syno.get_camera_info(cameras[0], version=v) print(f" version={v} HTTP={status} success={body and body.get('success')}") if body and body.get("success"): # Print just the keys of the first camera object so we # can see if recordSchedule / recordType is present. cams_data = (body.get("data") or {}).get("cameras") or [] if cams_data: cam0 = cams_data[0] print(f" keys: {sorted(cam0.keys())[:30]}") record_keys = {k: cam0[k] for k in cam0 if "record" in k.lower() or "schedule" in k.lower() or "motion" in k.lower()} if record_keys: print(" recording-related fields:") for k, v_ in record_keys.items(): sval = json.dumps(v_, ensure_ascii=False) if len(sval) > 200: sval = sval[:200] + "…" print(f" {k}: {sval}") break # stop on first version that worked elif body: print(f" error: {body.get('error')}") # --- (d) Recording.GetInfo per chunk ------------------------- print() print("--- (d) Recording.GetInfo per chunk in narrow window ---") print(" (looking for fields like startTime, recordingTime, eventVideoLength, size)") for r in narrow[:7]: # cap at 7 to keep output reasonable print(f" id={r['id']} ({r['filePath']}):") found = False for v in (6, 7, 8): status, raw, body = syno.get_recording_info(r["id"], version=v) if body and body.get("success"): rec_data = body.get("data") or {} # Could be {recording: {...}} or {recordings: [{...}]} item = (rec_data.get("recording") or (rec_data.get("recordings") or [None])[0] or rec_data) if isinstance(item, dict): keys = sorted(item.keys()) print(f" version={v} keys: {keys}") rich = {k: item[k] for k in item if any(s in k.lower() for s in ("time", "length", "duration", "size", "schedule", "type"))} for k, val in rich.items(): sval = json.dumps(val, ensure_ascii=False) if len(sval) > 200: sval = sval[:200] + "…" print(f" {k}: {sval}") else: print(f" version={v} unexpected shape: {str(rec_data)[:200]}") found = True break elif body: err = body.get("error", {}) if err.get("code") not in (102, 103, 105): # method-not-found-ish print(f" version={v} error: {err}") if not found: print(" no version returned success — GetInfo may not be exposed") # --- (e) Recording.List at higher versions ------------------- print() print("--- (e) Recording.List at v7 and v8 (does newer API expose more fields?) ---") for v in (7, 8): status, raw, body = syno.list_recordings_version(cameras, from_ts, to_ts, version=v) print(f" version={v} HTTP={status} success={body and body.get('success')}") if body and body.get("success"): items = (body.get("data") or {}).get("recordings") or [] if items: keys = sorted(items[0].keys()) print(f" first item keys: {keys}") sample = json.dumps(items[0], indent=2, ensure_ascii=False) print("\n".join(" " + ln for ln in sample.splitlines())) else: print(" (zero items)") elif body: print(f" error: {body.get('error')}") # --- (f) HEAD/Range probe of Download URL -------------------- # Tells us actual file byte size of the chosen chunk; given the # codec (h.264 from List) we can infer rough duration. if chosen: print() print(f"--- (f) HEAD probe of Download for chosen id={chosen['id']} ---") for v in (6, 7, 8): status, hdrs, total = syno.head_download(chosen["id"], version=v) print(f" version={v} HTTP={status} total_bytes={total}") if total: interesting = {k: v for k, v in hdrs.items() if k in ("content-type", "content-length", "content-range", "content-disposition", "accept-ranges")} for k, val in interesting.items(): print(f" {k}: {val}") break elif "_error" in hdrs: print(f" error: {hdrs['_error']}") # --- (g) Actual full download of the chosen chunk ------------ # The single most authoritative number — actual MP4 size that # comes out of Download. Compare to playMs: if bytes corresponds # to ≤ 60 s of h264 1080p, the chunk is short. if chosen: print() print(f"--- (g) Download chosen chunk WITHOUT slicing (offsetMs/playMs both omitted) ---") tmp_path = "/tmp/synology-test-chunk.mp4" try: full_bytes = syno.download(chosen["id"], tmp_path, offset_ms=None, play_ms=None) print(f" ✓ {full_bytes} bytes → {tmp_path}") # rough duration estimate: 1080p h264 ≈ 0.5–1 MB/s est_min = full_bytes / (1_000_000) / 60 est_max = full_bytes / (500_000) / 60 print(f" rough duration estimate: {est_min*60:.0f}–{est_max*60:.0f} s " f"(at h264 1080p ~0.5–1 MB/s)") # If ffprobe is available, get the real duration import shutil, subprocess if shutil.which("ffprobe"): try: out = subprocess.check_output( ["ffprobe", "-v", "error", "-show_entries", "format=duration,size,bit_rate", "-of", "json", tmp_path], timeout=15 ) meta = json.loads(out).get("format", {}) print(f" ffprobe: duration={meta.get('duration')}s " f"size={meta.get('size')}B " f"bit_rate={meta.get('bit_rate')}bps") except Exception as exc: # noqa: BLE001 print(f" ffprobe failed: {exc}") else: print(" (ffprobe not installed — skipping precise duration)") except Exception as exc: # noqa: BLE001 print(f" download failed: {exc}") # --- (h) Sliced download (matches what prod does) ------------ if chosen and offset_ms is not None and play_ms is not None: print() print(f"--- (h) Download chosen chunk WITH slicing (what prod sends) ---") print(f" offsetMs={offset_ms} playMs={play_ms} (asks for {play_ms/1000:.0f}s)") tmp_path2 = "/tmp/synology-test-sliced.mp4" try: sliced_bytes = syno.download(chosen["id"], tmp_path2, offset_ms=offset_ms, play_ms=play_ms) print(f" ✓ {sliced_bytes} bytes → {tmp_path2}") import shutil, subprocess if shutil.which("ffprobe"): try: out = subprocess.check_output( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", tmp_path2], timeout=15 ) meta = json.loads(out).get("format", {}) print(f" ffprobe duration: {meta.get('duration')}s " f"(asked {play_ms/1000:.0f}s)") except Exception as exc: # noqa: BLE001 print(f" ffprobe failed: {exc}") except Exception as exc: # noqa: BLE001 print(f" download failed: {exc}") # --- (i) Wide listing on the same camera (±30 min) ----------- # If the GUI shows a recording at this time but the narrow # listing returned 0, the chunk's start may sit outside the # narrow window's overlap rule. ±30 min reveals everything # this camera has anywhere near the moment. wide_pre = max(args.pre, 1800) wide_post = max(args.post, 1800) wide_from = anchor - wide_pre wide_to = anchor + wide_post print() print(f"--- (i) Wide listing on cam {','.join(map(str, cameras))} ±{wide_pre}/{wide_post}s ---") print(f" window {fmt_utc(wide_from)} → {fmt_utc(wide_to)}") wide = syno.list_recordings(cameras, wide_from, wide_to, verbose=False) print(f" {len(wide)} hit(s)") ordered_wide = sorted( [w for w in wide if w.get("startTime") is not None], key=lambda w: int(w["startTime"]), ) for w in ordered_wide: st = int(w["startTime"]) delta = st - anchor label = f" ({delta:+d}s vs anchor)" if abs(delta) < 60: label += " ← within 1 min of anchor" print(f" id={w['id']} start={fmt_utc(st)}{label} file={w['filePath']}") # --- (j) Unfiltered listing (any camera) in narrow window ---- # Tells us if a different camera_id holds the chunk we want. # Useful when the GUI shows the recording 'under cam 2' but # internally it's tagged to a sub-channel id we didn't query. print() print("--- (j) Unfiltered Recording.List in narrow window (all cameras) ---") status, raw, body = syno.list_recordings_unfiltered(from_ts, to_ts) print(f" HTTP={status} success={body and body.get('success')}") if body and body.get("success"): items = (body.get("data") or {}).get("recordings") or [] print(f" total recordings (any cam): {len(items)}") by_cam: dict = {} for it in items: cid = it.get("camera_id") if "camera_id" in it else it.get("cameraId") by_cam.setdefault(cid, []).append(it) for cid in sorted(by_cam.keys(), key=lambda x: (x is None, x)): cam_items = by_cam[cid] print(f" cameraId={cid} ({len(cam_items)} chunk(s)):") for it in cam_items[:5]: fp = it.get("filePath") or it.get("file_path") or "?" rid = it.get("id") or it.get("recordId") name = it.get("cameraName") or it.get("camera_name") print(f" id={rid} name={name!r} file={fp}") if len(cam_items) > 5: print(f" …(+{len(cam_items) - 5} more)") elif body: print(f" error: {body.get('error')}") # --- (l) Recording.RangeExport — server-side time-range merge ---- # The doc'd 3-step async flow (RangeExport → GetRangeExportProgress # → OnRangeExportDone). If this returns a single MP4 covering the # full window we can throw away ffmpeg + per-event clipOffsetSec. # If it returns a zip (codec/resolution differences across the # range) we'd have to handle that separately on the server. import time as _time print() print(f"--- (l) Recording.RangeExport for cam {cameras[0]} {fmt_utc(from_ts)} → {fmt_utc(to_ts)} ---") st, raw, body = syno.range_export_start(cameras[0], from_ts, to_ts, "video") print(f" Step 1 RangeExport HTTP={st} success={body and body.get('success')}") if not body or not body.get("success"): print(f" error: {(body or {}).get('error') or '(none)'}") else: dlid = (body.get("data") or {}).get("dlid") print(f" dlid={dlid}") file_ext = "" t0 = _time.time() for i in range(40): # ~80 s budget at 2 s polls st2, _r2, b2 = syno.range_export_progress(dlid) if not b2 or not b2.get("success"): print(f" Step 2 progress poll #{i+1} FAILED: {(b2 or {}).get('error')}") break d = b2.get("data") or {} progress = d.get("progress") file_ext = d.get("fileExt") or "" print(f" Step 2 progress poll #{i+1} progress={progress} fileExt={file_ext!r}") if progress == -1: print(" ✗ export failed") break if progress == 100: break _time.sleep(2) else: print(" Step 2 timed out before progress=100") progress = None elapsed = _time.time() - t0 print(f" → reached progress=100 after {elapsed:.1f}s") if file_ext.lower() == "mp4": out = "/tmp/synology-rangeexport.mp4" try: n = syno.range_export_download(dlid, out, "video") print(f" Step 3 download {n} bytes → {out}") import shutil as _shutil, subprocess as _sp if _shutil.which("ffprobe"): try: o = _sp.check_output( ["ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate", "-of", "json", out], timeout=15 ) meta = json.loads(o).get("format", {}) dur = float(meta.get("duration") or 0) window_dur = to_ts - from_ts print(f" ffprobe: duration={dur:.2f}s bit_rate={meta.get('bit_rate')}bps") print(f" window asked: {window_dur}s ratio={dur/max(1,window_dur):.2f}") if dur > window_dur * 0.95: print(" → timeline-aligned (gaps appear filled or merged continuously)") else: print(" → motion-only (gaps removed, just like our ffmpeg path)") except Exception as exc: # noqa: BLE001 print(f" ffprobe failed: {exc}") except Exception as exc: # noqa: BLE001 print(f" Step 3 failed: {exc}") elif file_ext.lower() == "zip": print(" → Result is a ZIP (codec/resolution mismatch across range)") print(" would need to handle multi-file fallback if we adopt this API") # --- (k) Camera.List — id ↔ name mapping --------------------- # Confirms which internal id matches the GUI's "Cam 2" label. print() print("--- (k) Camera.List (id ↔ name mapping) ---") for v in (9, 8, 7, 6): status, raw, body = syno.camera_list(version=v) print(f" version={v} HTTP={status} success={body and body.get('success')}") if body and body.get("success"): cams_data = (body.get("data") or {}).get("cameras") or [] for c in cams_data: cid = c.get("id") or c.get("camera_id") name = c.get("name") or c.get("newName") or c.get("camera_name") enabled = c.get("enabled") rec = c.get("recStatus") or c.get("recordStatus") print(f" id={cid} name={name!r} enabled={enabled} rec={rec}") break elif body: print(f" error: {body.get('error')}") if args.download and chosen: print() print(f"=== Downloading id={chosen['id']} → {args.download} ===") written = syno.download(chosen["id"], args.download, offset_ms=offset_ms, play_ms=play_ms) print(f" ✓ {written} bytes") finally: syno.logout() print() print("✓ Logged out") return 0 if __name__ == "__main__": sys.exit(main())