Source code for recorder

"""Video recording management for cat detection events.

This module provides a VideoRecorder class for managing video recording
sessions. It handles file creation, frame writing, timing control, and
automatic filename generation with timestamps.

The recorder uses OpenCV's VideoWriter with the mp4v codec for compatibility
on Raspberry Pi. Recordings are saved as MP4 files with timestamps in the
filename.

Example:
    Basic usage::
        recorder = VideoRecorder(output_dir=Path("recordings"), record_seconds=60)
        output_path = recorder.generate_filename()
        recorder.start_recording(output_path, (1920, 1080), fps=10)
        recorder.write_frame(frame)
        recorder.stop_recording()
"""

import logging
import cv2
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from config import DEFAULT_RECORD_SECONDS, DEFAULT_OUTPUT_DIR

logger = logging.getLogger(__name__)


[docs] class VideoRecorder: """Manages video recording with timing control."""
[docs] def __init__(self, output_dir: Path = DEFAULT_OUTPUT_DIR, record_seconds: int = DEFAULT_RECORD_SECONDS): """ Initialize the video recorder. Args: output_dir: Directory to save recordings record_seconds: Duration to record in seconds """ self.output_dir = Path(output_dir) self.record_seconds = record_seconds self.writer: Optional[cv2.VideoWriter] = None self.recording_start_time: Optional[float] = None self.output_path: Optional[Path] = None self.frame_size: Optional[Tuple[int, int]] = None self.fps: Optional[int] = None # Create output directory if it doesn't exist self.output_dir.mkdir(parents=True, exist_ok=True)
[docs] def is_recording(self) -> bool: """ Check if currently recording. Returns: True if recording, False otherwise """ return self.writer is not None and self.recording_start_time is not None
[docs] def should_continue_recording(self) -> bool: """ Check if recording should continue based on elapsed time. Returns: True if recording should continue, False if time limit reached """ if not self.is_recording(): return False elapsed = time.time() - self.recording_start_time should_continue = elapsed < self.record_seconds # Log when recording is about to stop (within 1 second of limit) if not should_continue: logger.debug(f"Recording duration limit reached: elapsed={elapsed:.1f}s, limit={self.record_seconds}s") return should_continue
[docs] def start_recording(self, output_path: Path, frame_size: Tuple[int, int], fps: int) -> bool: """ Start a new recording session. Args: output_path: Full path to output video file frame_size: Tuple of (width, height) fps: Frames per second for the video Returns: True if recording started successfully, False otherwise """ if self.is_recording(): logger.warning("Already recording, cannot start new recording") return False try: # Use H.264 codec (mp4v is more compatible but larger files) # For Raspberry Pi, use 'mp4v' or 'XVID' codec fourcc = cv2.VideoWriter_fourcc(*"mp4v") self.writer = cv2.VideoWriter(str(output_path), fourcc, fps, frame_size) if not self.writer.isOpened(): logger.error(f"Failed to open video writer for {output_path}") self.writer = None return False self.output_path = output_path self.frame_size = frame_size self.fps = fps self.recording_start_time = time.time() logger.info(f"Recording started: {output_path.name} (duration: {self.record_seconds}s)") return True except Exception as e: logger.error(f"Error starting recording: {e}") self.writer = None return False
[docs] def write_frame(self, frame) -> bool: """ Write a frame to the current recording. Args: frame: Frame to write (numpy array) Returns: True if frame written successfully, False otherwise """ if not self.is_recording(): return False if frame is None or frame.size == 0: return False try: # Resize frame if necessary to match recording resolution if self.frame_size: h, w = frame.shape[:2] if (w, h) != self.frame_size: frame = cv2.resize(frame, self.frame_size) self.writer.write(frame) return True except Exception as e: logger.error(f"Error writing frame: {e}") return False
[docs] def stop_recording(self) -> Optional[Path]: """ Stop the current recording. Returns: Path to the recorded file, or None if no recording was active """ if not self.is_recording(): return None try: if self.writer: self.writer.release() self.writer = None elapsed = time.time() - self.recording_start_time if self.recording_start_time else 0 output_path = self.output_path logger.info(f"Recording stopped: {output_path.name if output_path else 'unknown'} (elapsed: {elapsed:.1f}s, expected: {self.record_seconds}s)") self.recording_start_time = None self.output_path = None self.frame_size = None self.fps = None return output_path except Exception as e: logger.error(f"Error stopping recording: {e}") return None
[docs] def generate_filename(self) -> Path: """ Generate a timestamped filename for a new recording. Returns: Path to the output file """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"cat_{timestamp}.mp4" return self.output_dir / filename
[docs] def generate_test_filename(self) -> Path: """ Generate a timestamped filename for a test recording. Returns: Path to the output file """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"test_{timestamp}.mp4" return self.output_dir / filename