|  | 
|  | 1 | +from pathlib import Path | 
|  | 2 | + | 
|  | 3 | +import cv2 | 
|  | 4 | +import numpy as np | 
|  | 5 | +import pytest | 
|  | 6 | + | 
|  | 7 | +import supervision as sv | 
|  | 8 | + | 
|  | 9 | + | 
|  | 10 | +def make_video( | 
|  | 11 | +    path: Path, w: int = 160, h: int = 96, fps: int = 20, frames: int = 24 | 
|  | 12 | +) -> None: | 
|  | 13 | +    """Create a small synthetic test video with predictable frame-colors.""" | 
|  | 14 | +    fourcc = cv2.VideoWriter_fourcc(*"mp4v") | 
|  | 15 | +    writer = cv2.VideoWriter(str(path), fourcc, fps, (w, h)) | 
|  | 16 | +    assert writer.isOpened(), "Failed to open VideoWriter" | 
|  | 17 | +    for i in range(frames): | 
|  | 18 | +        v = (i * 11) % 250 | 
|  | 19 | +        frame = np.full((h, w, 3), (v, 255 - v, (2 * v) % 255), np.uint8) | 
|  | 20 | +        writer.write(frame) | 
|  | 21 | +    writer.release() | 
|  | 22 | + | 
|  | 23 | + | 
|  | 24 | +def read_frames(path: Path) -> list[np.ndarray]: | 
|  | 25 | +    """Read all frames from a video into memory.""" | 
|  | 26 | +    cap = cv2.VideoCapture(str(path)) | 
|  | 27 | +    assert cap.isOpened(), f"Cannot open video: {path}" | 
|  | 28 | +    out = [] | 
|  | 29 | +    while True: | 
|  | 30 | +        ok, frame = cap.read() | 
|  | 31 | +        if not ok: | 
|  | 32 | +            break | 
|  | 33 | +        out.append(frame) | 
|  | 34 | +    cap.release() | 
|  | 35 | +    return out | 
|  | 36 | + | 
|  | 37 | + | 
|  | 38 | +def frames_equal(a: np.ndarray, b: np.ndarray, max_abs_tol: int = 0) -> bool: | 
|  | 39 | +    """Return True if frames are the same within acertain tolerance.""" | 
|  | 40 | +    if a.shape != b.shape: | 
|  | 41 | +        return False | 
|  | 42 | +    diff = np.abs(a.astype(np.int16) - b.astype(np.int16)) | 
|  | 43 | +    return diff.max() <= max_abs_tol | 
|  | 44 | + | 
|  | 45 | + | 
|  | 46 | +def callback_noop(frame: np.ndarray, idx: int) -> np.ndarray: | 
|  | 47 | +    """No-op callback: validates pure pipeline correctness.""" | 
|  | 48 | +    return frame | 
|  | 49 | + | 
|  | 50 | + | 
|  | 51 | +def callbackb_opencv(frame: np.ndarray, idx: int) -> np.ndarray: | 
|  | 52 | +    """ | 
|  | 53 | +    Simulations some cv2 task... | 
|  | 54 | +    """ | 
|  | 55 | +    g = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | 
|  | 56 | +    return cv2.cvtColor(g, cv2.COLOR_GRAY2BGR) | 
|  | 57 | + | 
|  | 58 | + | 
|  | 59 | +@pytest.mark.parametrize( | 
|  | 60 | +    "callback", [callback_noop, callbackb_opencv], ids=["identity", "opencv"] | 
|  | 61 | +) | 
|  | 62 | +def test_process_video_vs_threads_same_output(callback, tmp_path: Path): | 
|  | 63 | +    """ | 
|  | 64 | +    Ensure that process_video() and process_video_threads() produce identical | 
|  | 65 | +    results for the same synthetic source video and callback. | 
|  | 66 | +    """ | 
|  | 67 | +    name = callback.__name__ | 
|  | 68 | +    src = tmp_path / f"src_{name}.mp4" | 
|  | 69 | +    dst_single = tmp_path / f"out_single_{name}.mp4" | 
|  | 70 | +    dst_threads = tmp_path / f"out_threads_{name}.mp4" | 
|  | 71 | + | 
|  | 72 | +    make_video(src, frames=24) | 
|  | 73 | + | 
|  | 74 | +    sv.utils.video.process_video( | 
|  | 75 | +        source_path=str(src), | 
|  | 76 | +        target_path=str(dst_single), | 
|  | 77 | +        callback=callback, | 
|  | 78 | +        show_progress=False, | 
|  | 79 | +    ) | 
|  | 80 | +    sv.utils.video.process_video_threads( | 
|  | 81 | +        source_path=str(src), | 
|  | 82 | +        target_path=str(dst_threads), | 
|  | 83 | +        callback=callback, | 
|  | 84 | +        prefetch=4, | 
|  | 85 | +        writer_buffer=4, | 
|  | 86 | +        show_progress=False, | 
|  | 87 | +    ) | 
|  | 88 | + | 
|  | 89 | +    frames_single = read_frames(dst_single) | 
|  | 90 | +    frames_threads = read_frames(dst_threads) | 
|  | 91 | + | 
|  | 92 | +    assert len(frames_single) == len(frames_threads) != 0, "Frame count mismatch." | 
|  | 93 | + | 
|  | 94 | +    for i, (fs, ft) in enumerate(zip(frames_single, frames_threads)): | 
|  | 95 | +        assert frames_equal(fs, ft), f"Frame {i} is different." | 
0 commit comments