#!/usr/bin/env python3 import argparse import shutil import subprocess import sys import tempfile import time from pathlib import Path try: from tqdm import tqdm HAS_TQDM = True except ImportError: HAS_TQDM = False def run(cmd: list[str]) -> None: print("\n$", " ".join(cmd)) completed = subprocess.run(cmd) if completed.returncode != 0: raise RuntimeError(f"Command failed ({completed.returncode}): {' '.join(cmd)}") def command_exists(name: str) -> bool: return shutil.which(name) is not None def assert_prerequisites(realesrgan_bin: str) -> None: missing = [] if not command_exists("ffmpeg"): missing.append("ffmpeg") if not command_exists("ffprobe"): missing.append("ffprobe") real_esrgan_ok = Path(realesrgan_bin).exists() or command_exists(realesrgan_bin) if not real_esrgan_ok: missing.append(realesrgan_bin) if missing: items = ", ".join(missing) raise RuntimeError( f"Missing required tools: {items}.\n" "Install them first (see README.md)." ) def has_audio_stream(input_video: Path) -> bool: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "csv=p=0", str(input_video), ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 and "audio" in result.stdout def count_png_frames(folder: Path) -> int: return sum(1 for _ in folder.glob("*.png")) def run_upscale_with_progress(cmd: list[str], input_frames: Path, output_frames: Path) -> None: total_frames = count_png_frames(input_frames) if total_frames == 0: raise RuntimeError("No extracted frames found before upscaling.") started = time.time() # Suppress Real-ESRGAN's verbose output by redirecting stdout/stderr process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if HAS_TQDM: pbar = tqdm(total=total_frames, unit="frames", desc="Upscaling", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") last_count = 0 else: print(f"Upscaling: 0/{total_frames} frames (0.0%) | ETA --:--:--") last_print = 0.0 while True: return_code = process.poll() now = time.time() done_frames = count_png_frames(output_frames) if HAS_TQDM: delta = done_frames - last_count if delta > 0: pbar.update(delta) last_count = done_frames else: if now - last_print >= 2.0: progress = min(100.0, (done_frames / total_frames) * 100) elapsed = max(now - started, 1e-6) fps = done_frames / elapsed if done_frames > 0 and fps > 0: remaining_frames = max(total_frames - done_frames, 0) eta_seconds = int(remaining_frames / fps) eta_h, rem = divmod(eta_seconds, 3600) eta_m, eta_s = divmod(rem, 60) eta_str = f"{eta_h:02d}:{eta_m:02d}:{eta_s:02d}" else: eta_str = "--:--:--" print( f"Upscaling: {done_frames}/{total_frames} " f"({progress:.1f}%) | {fps:.2f} fps | ETA {eta_str}" ) last_print = now if return_code is not None: done_frames = count_png_frames(output_frames) if HAS_TQDM: delta = done_frames - last_count if delta > 0: pbar.update(delta) pbar.close() elapsed = max(time.time() - started, 1e-6) fps = done_frames / elapsed print( f"Upscaling complete: {done_frames}/{total_frames} frames | " f"avg {fps:.2f} fps | total time {elapsed:.1f}s" ) if return_code != 0: raise RuntimeError(f"Command failed ({return_code}): {' '.join(cmd)}") break time.sleep(0.2) def upscale_video( input_video: Path, output_video: Path, realesrgan_bin: str, model: str, model_path: str | None, scale: int, tile_size: int, jobs: str, fps: str | None, codec: str, crf: int, preset: str, keep_temp: bool, temp_root: Path, gpu_id: str, test_seconds: float | None, pre_vf: str | None, ) -> None: if not input_video.exists(): raise FileNotFoundError(f"Input video does not exist: {input_video}") output_video.parent.mkdir(parents=True, exist_ok=True) temp_root.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(prefix="video_upscale_", dir=str(temp_root)) as tmp_dir_str: tmp_dir = Path(tmp_dir_str) frames_in = tmp_dir / "frames_in" frames_out = tmp_dir / "frames_out" audio_file = tmp_dir / "audio.m4a" frames_in.mkdir(parents=True, exist_ok=True) frames_out.mkdir(parents=True, exist_ok=True) print(f"Working directory: {tmp_dir}") if test_seconds is not None: print(f"Test mode: processing first {test_seconds:.2f} seconds") test_duration_args = ["-t", str(test_seconds)] if test_seconds is not None else [] filter_chain = [] if pre_vf: filter_chain.append(pre_vf) filter_chain.extend([ "scale=ceil(iw*sar/2)*2:ih", "setsar=1", ]) extract_cmd = [ "ffmpeg", "-y", "-i", str(input_video), *test_duration_args, "-vf", ",".join(filter_chain), str(frames_in / "%08d.png"), ] run(extract_cmd) audio_present = has_audio_stream(input_video) if audio_present: extract_audio_cmd = [ "ffmpeg", "-y", "-i", str(input_video), *test_duration_args, "-vn", "-c:a", "aac", "-b:a", "192k", str(audio_file), ] run(extract_audio_cmd) upscale_cmd = [ realesrgan_bin, "-i", str(frames_in), "-o", str(frames_out), "-n", model, "-s", str(scale), "-f", "png", "-t", str(tile_size), "-j", jobs, "-g", gpu_id, ] if model_path: upscale_cmd.extend(["-m", model_path]) run_upscale_with_progress(upscale_cmd, frames_in, frames_out) fps_args = ["-r", fps] if fps else [] encode_cmd = [ "ffmpeg", "-y", *fps_args, "-i", str(frames_out / "%08d.png"), ] if audio_present and audio_file.exists(): encode_cmd.extend(["-i", str(audio_file), "-c:a", "copy"]) encode_cmd.extend( [ "-c:v", codec, "-crf", str(crf), "-preset", preset, "-pix_fmt", "yuv420p", str(output_video), ] ) run(encode_cmd) if keep_temp: kept = output_video.parent / f"{output_video.stem}_tmp" if kept.exists(): shutil.rmtree(kept) shutil.copytree(tmp_dir, kept) print(f"Temporary files copied to: {kept}") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Upscale a video locally with Real-ESRGAN (RTX GPU via Vulkan)." ) parser.add_argument("-i", "--input", required=True, help="Input video path") parser.add_argument("-o", "--output", required=True, help="Output video path") parser.add_argument( "--realesrgan-bin", default="realesrgan-ncnn-vulkan", help="Path or command name of realesrgan-ncnn-vulkan", ) parser.add_argument( "--model", default="realesr-animevideov3", help="Model name (e.g. realesr-animevideov3, realesrgan-x4plus)", ) parser.add_argument( "--model-path", default=None, help="Path to models directory (required if models not in default location)", ) parser.add_argument("--scale", type=int, default=2, choices=[2, 3, 4]) parser.add_argument( "--tile-size", type=int, default=0, help="Tile size for VRAM-limited cases (0 = auto)", ) parser.add_argument( "--jobs", default="2:2:2", help="NCNN worker threads as load:proc:save", ) parser.add_argument( "--fps", default=None, help="Override output FPS (default: keep source timing)", ) parser.add_argument("--codec", default="libx264", help="Output video codec") parser.add_argument("--crf", type=int, default=16, help="Quality (lower = better)") parser.add_argument("--preset", default="medium", help="Encoder preset") parser.add_argument( "--keep-temp", action="store_true", help="Keep extracted/upscaled frame files next to output", ) parser.add_argument( "--temp-root", default="/mnt/winsteam", help="Directory used for temporary working files (default: /mnt/winsteam)", ) parser.add_argument( "--gpu-id", default="auto", help="Vulkan GPU id for Real-ESRGAN (e.g. 0, 1, 0,1). Use 'auto' by default", ) parser.add_argument( "--test-seconds", type=float, default=None, help="Only process first N seconds (for quick test runs)", ) parser.add_argument( "--pre-vf", default=None, help="Optional ffmpeg video filter(s) applied before upscaling (e.g. hqdn3d=1.5:1.5:6:6)", ) return parser.parse_args() def main() -> int: args = parse_args() try: assert_prerequisites(args.realesrgan_bin) upscale_video( input_video=Path(args.input), output_video=Path(args.output), realesrgan_bin=args.realesrgan_bin, model=args.model, model_path=args.model_path, scale=args.scale, tile_size=args.tile_size, jobs=args.jobs, fps=args.fps, codec=args.codec, crf=args.crf, preset=args.preset, keep_temp=args.keep_temp, temp_root=Path(args.temp_root), gpu_id=args.gpu_id, test_seconds=args.test_seconds, pre_vf=args.pre_vf, ) except Exception as exc: print(f"Error: {exc}", file=sys.stderr) return 1 print("\nDone.") return 0 if __name__ == "__main__": raise SystemExit(main())