Source code for main

#!/usr/bin/env python3
"""Main entry point for cat detection and recording system.

This module provides the command-line interface and main monitoring loop for
a Raspberry Pi-based cat detection and video recording system. It uses an IR
camera (picamera2), YOLOv8 object detection model, and OpenCV for video
processing and recording.

The system continuously monitors camera input, detects cats using a pre-trained
YOLOv8 model, and automatically records video clips when cats are detected.

Example:
    Basic usage with default settings::
        python3 main.py

    With custom output directory::
        python3 main.py --output-dir /mnt/nfs

    Test camera recording (10 seconds, no detection)::
        python3 main.py --test-record

    With preview window::
        python3 main.py --show-preview
"""

import argparse
import logging
import sys
import time
from pathlib import Path
from typing import Tuple, List, TYPE_CHECKING

if TYPE_CHECKING:
    from detector import Detection

import cv2
import numpy as np

from camera import Camera
from detector import CatDetector
from recorder import VideoRecorder
import config


[docs] def parse_resolution(resolution_str: str) -> Tuple[int, int]: """Parse resolution string in format 'WIDTHxHEIGHT'. Converts a resolution string like "640x480" into a tuple of integers. The format is case-insensitive and accepts 'x' or 'X' as separator. Args: resolution_str: Resolution string in format 'WIDTHxHEIGHT', e.g., '640x480', '1920x1080', or '3280x2464'. Returns: Tuple of (width, height) as integers. Raises: ValueError: If the format is invalid, contains non-numeric values, or if width/height are not positive integers. Example: >>> parse_resolution("640x480") (640, 480) >>> parse_resolution("1920X1080") (1920, 1080) """ try: parts = resolution_str.lower().split("x") if len(parts) != 2: raise ValueError("Resolution must be in format WIDTHxHEIGHT (e.g., 640x480)") width = int(parts[0]) height = int(parts[1]) if width <= 0 or height <= 0: raise ValueError("Width and height must be positive integers") return (width, height) except ValueError as e: raise ValueError(f"Invalid resolution format '{resolution_str}': {e}")
[docs] def draw_detections(frame: np.ndarray, detections: List["Detection"]) -> None: """Draw bounding boxes and labels on frame for detected cats. Draws green bounding boxes around detected cats and adds a label showing the object class and confidence score. The frame is modified in-place. Args: frame: Frame to draw on (BGR numpy array, shape: [height, width, 3]). Must be writable and contiguous. detections: List of Detection objects containing bounding box coordinates and confidence scores. Note: The frame must be writable. If using frames from picamera2, ensure they are converted to writable format using camera._convert_frame(). Example: >>> detections = [Detection(label="cat", confidence=0.85, bbox=(100, 50, 200, 150))] >>> draw_detections(frame, detections) # Frame now has green bounding box and label drawn """ for det in detections: x1, y1, x2, y2 = [int(coord) for coord in det.bbox] confidence = det.confidence # Draw bounding box cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Draw label with confidence label = f"{det.label} {confidence:.2f}" label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) label_y = max(y1 - 10, label_size[1] + 10) cv2.rectangle( frame, (x1, label_y - label_size[1] - 5), (x1 + label_size[0], label_y + 5), (0, 255, 0), -1, ) cv2.putText(frame, label, (x1, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
[docs] def setup_logging(log_level: str = "INFO") -> None: """Configure logging for the application. Sets up Python's logging module with a standard format that includes timestamp, logger name, level, and message. This is called early in main() to ensure all modules can use logging. Args: log_level: Logging level string. Must be one of: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Case-insensitive. Raises: ValueError: If log_level is not a valid logging level. Example: >>> setup_logging("DEBUG") # All modules can now use logging.getLogger(__name__) """ numeric_level = getattr(logging, log_level.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {log_level}") logging.basicConfig( level=numeric_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )
[docs] def main() -> None: """Main entry point and monitoring loop for cat detection system. This function: 1. Parses command-line arguments (with config file fallback) 2. Initializes camera, detector, and recorder 3. Runs the main monitoring loop: - Continuously reads frames from camera - Detects cats using YOLOv8 model - Starts recording when cats are detected - Records for configured duration 4. Handles cleanup and error conditions The function supports two modes: - Normal mode: Continuous monitoring with cat detection - Test mode: Record 10 seconds of camera footage (--test-record) Command-line arguments override values from config.yaml. See --help for all available options. Exit codes: 0: Normal exit (Ctrl+C or successful completion) 1: Error (camera failure, invalid arguments, etc.) Raises: SystemExit: On invalid arguments or critical errors. """ parser = argparse.ArgumentParser( description="Monitor IR camera and record video when cats are detected" ) parser.add_argument( "--config-file", type=str, default="config.yaml", help="Path to YAML configuration file (default: config.yaml)", ) parser.add_argument( "--camera-device", type=str, default=None, help="Camera device path (overrides config file)", ) parser.add_argument( "--fps", type=int, default=None, help="Target frames per second (overrides config file)", ) parser.add_argument( "--resolution", type=str, default=None, help="Video resolution WIDTHxHEIGHT (overrides config file)", ) parser.add_argument( "--threshold", type=float, default=None, help="Detection confidence threshold (overrides config file)", ) parser.add_argument( "--record-seconds", type=int, default=None, help="Recording duration in seconds (overrides config file)", ) parser.add_argument( "--output-dir", type=str, default=None, help="Output directory for recordings (overrides config file)", ) parser.add_argument( "--show-preview", action="store_true", help="Show preview window with bounding boxes", ) parser.add_argument( "--test-record", action="store_true", help="Record 10 seconds of camera footage for testing (bypasses cat detection)", ) parser.add_argument( "--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Set the logging level (default: INFO)", ) args = parser.parse_args() # Setup logging first setup_logging(args.log_level) logger = logging.getLogger(__name__) # Load YAML configuration config_path = Path(args.config_file) config.initialize_config(config_path) # Get defaults from config (will use hardcoded defaults if YAML not loaded) default_camera_device = config.get_camera_device() default_fps = config.get_fps() default_resolution = config.get_resolution() default_threshold = config.get_detection_threshold() default_record_seconds = config.get_record_seconds() default_output_dir = str(config.get_output_dir()) # Use CLI args if provided, otherwise use config values camera_device = args.camera_device if args.camera_device is not None else default_camera_device fps = args.fps if args.fps is not None else default_fps resolution_str = args.resolution if args.resolution is not None else f"{default_resolution[0]}x{default_resolution[1]}" threshold = args.threshold if args.threshold is not None else default_threshold record_seconds = args.record_seconds if args.record_seconds is not None else default_record_seconds output_dir_str = args.output_dir if args.output_dir is not None else default_output_dir # Validate arguments try: resolution = parse_resolution(resolution_str) except ValueError as e: logger.error(f"Invalid resolution: {e}") sys.exit(1) if threshold < 0 or threshold > 1: logger.error("Threshold must be between 0 and 1") sys.exit(1) if fps <= 0: logger.error("FPS must be positive") sys.exit(1) if record_seconds <= 0: logger.error("Record seconds must be positive") sys.exit(1) output_dir = Path(output_dir_str) # Initialize camera camera = Camera(device=camera_device, resolution=resolution, fps=fps) if not camera.open(): logger.error("=" * 60) logger.error("FAILED TO OPEN CAMERA") logger.error("=" * 60) logger.error(f"Camera index: {camera_device}") logger.error("") logger.error("Please check:") logger.error(" 1. Camera is detected: libcamera-hello --list-cameras") logger.error(" 2. Camera is not being used by another process") logger.error(" 3. Try a different camera index: python main.py --camera-device 1") logger.error(" 4. Verify picamera2 is installed: sudo apt install python3-picamera2") logger.error(" 5. Test picamera2: python3 -c 'from picamera2 import Picamera2; Picamera2()'") logger.error("=" * 60) sys.exit(1) # Validate camera can read frames if not camera.validate_camera(): logger.error("=" * 60) logger.error("CAMERA VALIDATION FAILED") logger.error("=" * 60) logger.error(f"Camera index: {camera_device}") logger.error("Camera opened successfully but cannot read frames.") logger.error("") logger.error("Please check:") logger.error(" 1. Camera is detected: libcamera-hello --list-cameras") logger.error(" 2. Camera is not being used by another process") logger.error(" 3. Camera hardware is functioning: libcamera-hello -t 0") logger.error(" 4. Try a different camera index: --camera-device 1") logger.error(" 5. Verify picamera2 is working: python3 -c 'from picamera2 import Picamera2; Picamera2()'") logger.error("=" * 60) camera.release() sys.exit(1) # Handle test recording mode if args.test_record: # Suppress verbose camera warnings during test mode camera_logger = logging.getLogger("camera") original_level = camera_logger.level camera_logger.setLevel(logging.ERROR) # Only show errors, suppress warnings logger.info("=" * 60) logger.info("TEST MODE: Recording 10 seconds of camera footage") logger.info("=" * 60) logger.info(f"Camera device: {camera_device}") logger.info(f"Resolution: {resolution[0]}x{resolution[1]}, FPS: {fps}") logger.info(f"Output directory: {output_dir}") recorder = VideoRecorder(output_dir=output_dir, record_seconds=10) output_path = recorder.generate_test_filename() actual_resolution = camera.get_resolution() if not recorder.start_recording(output_path, actual_resolution, fps): logger.error("Failed to start test recording") camera_logger.setLevel(original_level) # Restore original level camera.release() sys.exit(1) logger.info(f"Recording to: {output_path.name}") logger.info("Starting 10-second test recording...") start_time = time.time() frame_count = 0 preview_available = args.show_preview # Check if display is available before attempting preview if args.show_preview: import os if not os.environ.get("DISPLAY"): preview_available = False logger.warning("No DISPLAY environment variable set. Preview window disabled.") logger.warning("To enable preview, use X11 forwarding: ssh -X user@host") else: try: # Try to create a test window to check if display is available test_frame = np.zeros((100, 100, 3), dtype=np.uint8) cv2.imshow("Test", test_frame) cv2.waitKey(1) cv2.destroyAllWindows() preview_available = True logger.info("Preview window available") except Exception as e: preview_available = False logger.warning(f"Preview window not available: {e}") logger.warning("Continuing without preview window...") try: for frame in camera.read_frames(): if frame is None: continue frame_count += 1 elapsed = time.time() - start_time # Write frame to recording recorder.write_frame(frame) # Show preview if requested and available if preview_available: try: # Add status text remaining = max(0, 10 - elapsed) cv2.putText( frame, f"TEST RECORDING | Time: {elapsed:.1f}s / 10.0s | Frame: {frame_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, ) cv2.imshow("Test Recording", frame) # Handle window close or 'q' key key = cv2.waitKey(1) & 0xFF if key == ord("q"): logger.info("Preview window closed. Stopping test recording...") break except Exception as e: # If preview fails during recording, disable it but continue if preview_available: logger.warning(f"Preview window error: {e}. Disabling preview.") preview_available = False # Check if 10 seconds have elapsed if elapsed >= 10.0: break recorder.stop_recording() logger.info("=" * 60) logger.info(f"Test recording completed successfully!") logger.info(f"File: {output_path.name}") logger.info(f"Frames recorded: {frame_count}") logger.info(f"Duration: 10.0 seconds") logger.info("=" * 60) except KeyboardInterrupt: logger.info("Test recording interrupted by user") recorder.stop_recording() except RuntimeError as e: # Camera failure - already logged, just exit logger.error("Stopping test recording due to camera failure.") recorder.stop_recording() camera_logger.setLevel(original_level) camera.release() if args.show_preview: cv2.destroyAllWindows() sys.exit(1) except Exception as e: logger.exception(f"Error during test recording: {e}") recorder.stop_recording() finally: # Restore original camera logger level camera_logger.setLevel(original_level) camera.release() if preview_available: try: cv2.destroyAllWindows() except Exception as e: logger.debug(f"Error closing preview windows: {e}") return # Normal cat detection mode logger.info("Initializing cat detection system...") logger.info(f"Camera: {camera_device}") logger.info(f"Resolution: {resolution[0]}x{resolution[1]}, FPS: {fps}") logger.info(f"Detection threshold: {threshold}") logger.info(f"Recording duration: {record_seconds}s") logger.info(f"Output directory: {output_dir}") try: detector = CatDetector() except Exception as e: logger.error(f"Failed to initialize detector: {e}") camera.release() sys.exit(1) recorder = VideoRecorder(output_dir=output_dir, record_seconds=record_seconds) logger.info(f"VideoRecorder initialized with record_seconds={record_seconds}") logger.info("Starting monitoring loop. Press Ctrl+C to stop.") frame_count = 0 try: for frame in camera.read_frames(): if frame is None: continue frame_count += 1 # Run detection on current frame # YOLOv8 model processes the frame and returns list of Detection objects detections = detector.detect_cats(frame, threshold=threshold) # Check if we should start recording # Only start if: (1) not already recording, (2) at least one cat detected if not recorder.is_recording() and len(detections) > 0: # Cat detected - start recording # Generate timestamped filename and get actual camera resolution output_path = recorder.generate_filename() actual_resolution = camera.get_resolution() # May differ from config if using full sensor if recorder.start_recording(output_path, actual_resolution, fps): logger.info(f"Cat detected! (confidence: {detections[0].confidence:.2f}) - Starting recording") # Write the frame where cat was detected (don't lose the detection frame) recorder.write_frame(frame) else: logger.warning("Failed to start recording") # Continue recording if active # Write all frames while recording, regardless of detections if recorder.is_recording(): recorder.write_frame(frame) # Check if recording duration has elapsed # Recording stops automatically after configured duration (default 60s) if not recorder.should_continue_recording(): recorder.stop_recording() # Show preview if requested if args.show_preview: # Draw detections on frame if detections: draw_detections(frame, detections) # Add status text status = "RECORDING" if recorder.is_recording() else "MONITORING" cv2.putText( frame, f"Status: {status} | Frame: {frame_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.imshow("Cat Detection Monitor", frame) # Handle window close or 'q' key key = cv2.waitKey(1) & 0xFF if key == ord("q"): logger.info("Preview window closed. Stopping...") break # Log frame processing periodically if frame_count % 100 == 0: logger.debug(f"Processed {frame_count} frames") except KeyboardInterrupt: logger.info("Interrupted by user. Stopping...") except RuntimeError as e: # Camera failure - already logged with details logger.error("Stopping monitoring due to camera failure.") if recorder.is_recording(): recorder.stop_recording() camera.release() if args.show_preview: cv2.destroyAllWindows() sys.exit(1) except Exception as e: logger.exception(f"Error in monitoring loop: {e}") finally: # Cleanup if recorder.is_recording(): recorder.stop_recording() camera.release() if args.show_preview: try: cv2.destroyAllWindows() except Exception as e: logger.debug(f"Error closing preview windows: {e}") logger.info("Shutdown complete.")
if __name__ == "__main__": main()