"""Video recording management for cat detection events.
This module provides a VideoRecorder class for managing video recording
sessions. It handles file creation, frame writing, timing control, and
automatic filename generation with timestamps.
The recorder uses OpenCV's VideoWriter with the mp4v codec for compatibility
on Raspberry Pi. Recordings are saved as MP4 files with timestamps in the
filename.
Example:
Basic usage::
recorder = VideoRecorder(output_dir=Path("recordings"), record_seconds=60)
output_path = recorder.generate_filename()
recorder.start_recording(output_path, (1920, 1080), fps=10)
recorder.write_frame(frame)
recorder.stop_recording()
"""
import logging
import cv2
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from config import DEFAULT_RECORD_SECONDS, DEFAULT_OUTPUT_DIR
logger = logging.getLogger(__name__)
[docs]
class VideoRecorder:
"""Manages video recording with timing control."""
[docs]
def __init__(self, output_dir: Path = DEFAULT_OUTPUT_DIR, record_seconds: int = DEFAULT_RECORD_SECONDS):
"""
Initialize the video recorder.
Args:
output_dir: Directory to save recordings
record_seconds: Duration to record in seconds
"""
self.output_dir = Path(output_dir)
self.record_seconds = record_seconds
self.writer: Optional[cv2.VideoWriter] = None
self.recording_start_time: Optional[float] = None
self.output_path: Optional[Path] = None
self.frame_size: Optional[Tuple[int, int]] = None
self.fps: Optional[int] = None
# Create output directory if it doesn't exist
self.output_dir.mkdir(parents=True, exist_ok=True)
[docs]
def is_recording(self) -> bool:
"""
Check if currently recording.
Returns:
True if recording, False otherwise
"""
return self.writer is not None and self.recording_start_time is not None
[docs]
def should_continue_recording(self) -> bool:
"""
Check if recording should continue based on elapsed time.
Returns:
True if recording should continue, False if time limit reached
"""
if not self.is_recording():
return False
elapsed = time.time() - self.recording_start_time
should_continue = elapsed < self.record_seconds
# Log when recording is about to stop (within 1 second of limit)
if not should_continue:
logger.debug(f"Recording duration limit reached: elapsed={elapsed:.1f}s, limit={self.record_seconds}s")
return should_continue
[docs]
def start_recording(self, output_path: Path, frame_size: Tuple[int, int], fps: int) -> bool:
"""
Start a new recording session.
Args:
output_path: Full path to output video file
frame_size: Tuple of (width, height)
fps: Frames per second for the video
Returns:
True if recording started successfully, False otherwise
"""
if self.is_recording():
logger.warning("Already recording, cannot start new recording")
return False
try:
# Use H.264 codec (mp4v is more compatible but larger files)
# For Raspberry Pi, use 'mp4v' or 'XVID' codec
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
self.writer = cv2.VideoWriter(str(output_path), fourcc, fps, frame_size)
if not self.writer.isOpened():
logger.error(f"Failed to open video writer for {output_path}")
self.writer = None
return False
self.output_path = output_path
self.frame_size = frame_size
self.fps = fps
self.recording_start_time = time.time()
logger.info(f"Recording started: {output_path.name} (duration: {self.record_seconds}s)")
return True
except Exception as e:
logger.error(f"Error starting recording: {e}")
self.writer = None
return False
[docs]
def write_frame(self, frame) -> bool:
"""
Write a frame to the current recording.
Args:
frame: Frame to write (numpy array)
Returns:
True if frame written successfully, False otherwise
"""
if not self.is_recording():
return False
if frame is None or frame.size == 0:
return False
try:
# Resize frame if necessary to match recording resolution
if self.frame_size:
h, w = frame.shape[:2]
if (w, h) != self.frame_size:
frame = cv2.resize(frame, self.frame_size)
self.writer.write(frame)
return True
except Exception as e:
logger.error(f"Error writing frame: {e}")
return False
[docs]
def stop_recording(self) -> Optional[Path]:
"""
Stop the current recording.
Returns:
Path to the recorded file, or None if no recording was active
"""
if not self.is_recording():
return None
try:
if self.writer:
self.writer.release()
self.writer = None
elapsed = time.time() - self.recording_start_time if self.recording_start_time else 0
output_path = self.output_path
logger.info(f"Recording stopped: {output_path.name if output_path else 'unknown'} (elapsed: {elapsed:.1f}s, expected: {self.record_seconds}s)")
self.recording_start_time = None
self.output_path = None
self.frame_size = None
self.fps = None
return output_path
except Exception as e:
logger.error(f"Error stopping recording: {e}")
return None
[docs]
def generate_filename(self) -> Path:
"""
Generate a timestamped filename for a new recording.
Returns:
Path to the output file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"cat_{timestamp}.mp4"
return self.output_dir / filename
[docs]
def generate_test_filename(self) -> Path:
"""
Generate a timestamped filename for a test recording.
Returns:
Path to the output file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"test_{timestamp}.mp4"
return self.output_dir / filename