#!/usr/bin/env python3
"""Main entry point for cat detection and recording system.
This module provides the command-line interface and main monitoring loop for
a Raspberry Pi-based cat detection and video recording system. It uses an IR
camera (picamera2), YOLOv8 object detection model, and OpenCV for video
processing and recording.
The system continuously monitors camera input, detects cats using a pre-trained
YOLOv8 model, and automatically records video clips when cats are detected.
Example:
Basic usage with default settings::
python3 main.py
With custom output directory::
python3 main.py --output-dir /mnt/nfs
Test camera recording (10 seconds, no detection)::
python3 main.py --test-record
With preview window::
python3 main.py --show-preview
"""
import argparse
import logging
import sys
import time
from pathlib import Path
from typing import Tuple, List, TYPE_CHECKING
if TYPE_CHECKING:
from detector import Detection
import cv2
import numpy as np
from camera import Camera
from detector import CatDetector
from recorder import VideoRecorder
import config
[docs]
def parse_resolution(resolution_str: str) -> Tuple[int, int]:
"""Parse resolution string in format 'WIDTHxHEIGHT'.
Converts a resolution string like "640x480" into a tuple of integers.
The format is case-insensitive and accepts 'x' or 'X' as separator.
Args:
resolution_str: Resolution string in format 'WIDTHxHEIGHT', e.g., '640x480',
'1920x1080', or '3280x2464'.
Returns:
Tuple of (width, height) as integers.
Raises:
ValueError: If the format is invalid, contains non-numeric values,
or if width/height are not positive integers.
Example:
>>> parse_resolution("640x480")
(640, 480)
>>> parse_resolution("1920X1080")
(1920, 1080)
"""
try:
parts = resolution_str.lower().split("x")
if len(parts) != 2:
raise ValueError("Resolution must be in format WIDTHxHEIGHT (e.g., 640x480)")
width = int(parts[0])
height = int(parts[1])
if width <= 0 or height <= 0:
raise ValueError("Width and height must be positive integers")
return (width, height)
except ValueError as e:
raise ValueError(f"Invalid resolution format '{resolution_str}': {e}")
[docs]
def draw_detections(frame: np.ndarray, detections: List["Detection"]) -> None:
"""Draw bounding boxes and labels on frame for detected cats.
Draws green bounding boxes around detected cats and adds a label showing
the object class and confidence score. The frame is modified in-place.
Args:
frame: Frame to draw on (BGR numpy array, shape: [height, width, 3]).
Must be writable and contiguous.
detections: List of Detection objects containing bounding box coordinates
and confidence scores.
Note:
The frame must be writable. If using frames from picamera2, ensure they
are converted to writable format using camera._convert_frame().
Example:
>>> detections = [Detection(label="cat", confidence=0.85, bbox=(100, 50, 200, 150))]
>>> draw_detections(frame, detections)
# Frame now has green bounding box and label drawn
"""
for det in detections:
x1, y1, x2, y2 = [int(coord) for coord in det.bbox]
confidence = det.confidence
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label with confidence
label = f"{det.label} {confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
label_y = max(y1 - 10, label_size[1] + 10)
cv2.rectangle(
frame,
(x1, label_y - label_size[1] - 5),
(x1 + label_size[0], label_y + 5),
(0, 255, 0),
-1,
)
cv2.putText(frame, label, (x1, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
[docs]
def setup_logging(log_level: str = "INFO") -> None:
"""Configure logging for the application.
Sets up Python's logging module with a standard format that includes
timestamp, logger name, level, and message. This is called early in main()
to ensure all modules can use logging.
Args:
log_level: Logging level string. Must be one of: "DEBUG", "INFO",
"WARNING", "ERROR", "CRITICAL". Case-insensitive.
Raises:
ValueError: If log_level is not a valid logging level.
Example:
>>> setup_logging("DEBUG")
# All modules can now use logging.getLogger(__name__)
"""
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {log_level}")
logging.basicConfig(
level=numeric_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
[docs]
def main() -> None:
"""Main entry point and monitoring loop for cat detection system.
This function:
1. Parses command-line arguments (with config file fallback)
2. Initializes camera, detector, and recorder
3. Runs the main monitoring loop:
- Continuously reads frames from camera
- Detects cats using YOLOv8 model
- Starts recording when cats are detected
- Records for configured duration
4. Handles cleanup and error conditions
The function supports two modes:
- Normal mode: Continuous monitoring with cat detection
- Test mode: Record 10 seconds of camera footage (--test-record)
Command-line arguments override values from config.yaml. See --help for
all available options.
Exit codes:
0: Normal exit (Ctrl+C or successful completion)
1: Error (camera failure, invalid arguments, etc.)
Raises:
SystemExit: On invalid arguments or critical errors.
"""
parser = argparse.ArgumentParser(
description="Monitor IR camera and record video when cats are detected"
)
parser.add_argument(
"--config-file",
type=str,
default="config.yaml",
help="Path to YAML configuration file (default: config.yaml)",
)
parser.add_argument(
"--camera-device",
type=str,
default=None,
help="Camera device path (overrides config file)",
)
parser.add_argument(
"--fps",
type=int,
default=None,
help="Target frames per second (overrides config file)",
)
parser.add_argument(
"--resolution",
type=str,
default=None,
help="Video resolution WIDTHxHEIGHT (overrides config file)",
)
parser.add_argument(
"--threshold",
type=float,
default=None,
help="Detection confidence threshold (overrides config file)",
)
parser.add_argument(
"--record-seconds",
type=int,
default=None,
help="Recording duration in seconds (overrides config file)",
)
parser.add_argument(
"--output-dir",
type=str,
default=None,
help="Output directory for recordings (overrides config file)",
)
parser.add_argument(
"--show-preview",
action="store_true",
help="Show preview window with bounding boxes",
)
parser.add_argument(
"--test-record",
action="store_true",
help="Record 10 seconds of camera footage for testing (bypasses cat detection)",
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level (default: INFO)",
)
args = parser.parse_args()
# Setup logging first
setup_logging(args.log_level)
logger = logging.getLogger(__name__)
# Load YAML configuration
config_path = Path(args.config_file)
config.initialize_config(config_path)
# Get defaults from config (will use hardcoded defaults if YAML not loaded)
default_camera_device = config.get_camera_device()
default_fps = config.get_fps()
default_resolution = config.get_resolution()
default_threshold = config.get_detection_threshold()
default_record_seconds = config.get_record_seconds()
default_output_dir = str(config.get_output_dir())
# Use CLI args if provided, otherwise use config values
camera_device = args.camera_device if args.camera_device is not None else default_camera_device
fps = args.fps if args.fps is not None else default_fps
resolution_str = args.resolution if args.resolution is not None else f"{default_resolution[0]}x{default_resolution[1]}"
threshold = args.threshold if args.threshold is not None else default_threshold
record_seconds = args.record_seconds if args.record_seconds is not None else default_record_seconds
output_dir_str = args.output_dir if args.output_dir is not None else default_output_dir
# Validate arguments
try:
resolution = parse_resolution(resolution_str)
except ValueError as e:
logger.error(f"Invalid resolution: {e}")
sys.exit(1)
if threshold < 0 or threshold > 1:
logger.error("Threshold must be between 0 and 1")
sys.exit(1)
if fps <= 0:
logger.error("FPS must be positive")
sys.exit(1)
if record_seconds <= 0:
logger.error("Record seconds must be positive")
sys.exit(1)
output_dir = Path(output_dir_str)
# Initialize camera
camera = Camera(device=camera_device, resolution=resolution, fps=fps)
if not camera.open():
logger.error("=" * 60)
logger.error("FAILED TO OPEN CAMERA")
logger.error("=" * 60)
logger.error(f"Camera index: {camera_device}")
logger.error("")
logger.error("Please check:")
logger.error(" 1. Camera is detected: libcamera-hello --list-cameras")
logger.error(" 2. Camera is not being used by another process")
logger.error(" 3. Try a different camera index: python main.py --camera-device 1")
logger.error(" 4. Verify picamera2 is installed: sudo apt install python3-picamera2")
logger.error(" 5. Test picamera2: python3 -c 'from picamera2 import Picamera2; Picamera2()'")
logger.error("=" * 60)
sys.exit(1)
# Validate camera can read frames
if not camera.validate_camera():
logger.error("=" * 60)
logger.error("CAMERA VALIDATION FAILED")
logger.error("=" * 60)
logger.error(f"Camera index: {camera_device}")
logger.error("Camera opened successfully but cannot read frames.")
logger.error("")
logger.error("Please check:")
logger.error(" 1. Camera is detected: libcamera-hello --list-cameras")
logger.error(" 2. Camera is not being used by another process")
logger.error(" 3. Camera hardware is functioning: libcamera-hello -t 0")
logger.error(" 4. Try a different camera index: --camera-device 1")
logger.error(" 5. Verify picamera2 is working: python3 -c 'from picamera2 import Picamera2; Picamera2()'")
logger.error("=" * 60)
camera.release()
sys.exit(1)
# Handle test recording mode
if args.test_record:
# Suppress verbose camera warnings during test mode
camera_logger = logging.getLogger("camera")
original_level = camera_logger.level
camera_logger.setLevel(logging.ERROR) # Only show errors, suppress warnings
logger.info("=" * 60)
logger.info("TEST MODE: Recording 10 seconds of camera footage")
logger.info("=" * 60)
logger.info(f"Camera device: {camera_device}")
logger.info(f"Resolution: {resolution[0]}x{resolution[1]}, FPS: {fps}")
logger.info(f"Output directory: {output_dir}")
recorder = VideoRecorder(output_dir=output_dir, record_seconds=10)
output_path = recorder.generate_test_filename()
actual_resolution = camera.get_resolution()
if not recorder.start_recording(output_path, actual_resolution, fps):
logger.error("Failed to start test recording")
camera_logger.setLevel(original_level) # Restore original level
camera.release()
sys.exit(1)
logger.info(f"Recording to: {output_path.name}")
logger.info("Starting 10-second test recording...")
start_time = time.time()
frame_count = 0
preview_available = args.show_preview
# Check if display is available before attempting preview
if args.show_preview:
import os
if not os.environ.get("DISPLAY"):
preview_available = False
logger.warning("No DISPLAY environment variable set. Preview window disabled.")
logger.warning("To enable preview, use X11 forwarding: ssh -X user@host")
else:
try:
# Try to create a test window to check if display is available
test_frame = np.zeros((100, 100, 3), dtype=np.uint8)
cv2.imshow("Test", test_frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
preview_available = True
logger.info("Preview window available")
except Exception as e:
preview_available = False
logger.warning(f"Preview window not available: {e}")
logger.warning("Continuing without preview window...")
try:
for frame in camera.read_frames():
if frame is None:
continue
frame_count += 1
elapsed = time.time() - start_time
# Write frame to recording
recorder.write_frame(frame)
# Show preview if requested and available
if preview_available:
try:
# Add status text
remaining = max(0, 10 - elapsed)
cv2.putText(
frame,
f"TEST RECORDING | Time: {elapsed:.1f}s / 10.0s | Frame: {frame_count}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 0),
2,
)
cv2.imshow("Test Recording", frame)
# Handle window close or 'q' key
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
logger.info("Preview window closed. Stopping test recording...")
break
except Exception as e:
# If preview fails during recording, disable it but continue
if preview_available:
logger.warning(f"Preview window error: {e}. Disabling preview.")
preview_available = False
# Check if 10 seconds have elapsed
if elapsed >= 10.0:
break
recorder.stop_recording()
logger.info("=" * 60)
logger.info(f"Test recording completed successfully!")
logger.info(f"File: {output_path.name}")
logger.info(f"Frames recorded: {frame_count}")
logger.info(f"Duration: 10.0 seconds")
logger.info("=" * 60)
except KeyboardInterrupt:
logger.info("Test recording interrupted by user")
recorder.stop_recording()
except RuntimeError as e:
# Camera failure - already logged, just exit
logger.error("Stopping test recording due to camera failure.")
recorder.stop_recording()
camera_logger.setLevel(original_level)
camera.release()
if args.show_preview:
cv2.destroyAllWindows()
sys.exit(1)
except Exception as e:
logger.exception(f"Error during test recording: {e}")
recorder.stop_recording()
finally:
# Restore original camera logger level
camera_logger.setLevel(original_level)
camera.release()
if preview_available:
try:
cv2.destroyAllWindows()
except Exception as e:
logger.debug(f"Error closing preview windows: {e}")
return
# Normal cat detection mode
logger.info("Initializing cat detection system...")
logger.info(f"Camera: {camera_device}")
logger.info(f"Resolution: {resolution[0]}x{resolution[1]}, FPS: {fps}")
logger.info(f"Detection threshold: {threshold}")
logger.info(f"Recording duration: {record_seconds}s")
logger.info(f"Output directory: {output_dir}")
try:
detector = CatDetector()
except Exception as e:
logger.error(f"Failed to initialize detector: {e}")
camera.release()
sys.exit(1)
recorder = VideoRecorder(output_dir=output_dir, record_seconds=record_seconds)
logger.info(f"VideoRecorder initialized with record_seconds={record_seconds}")
logger.info("Starting monitoring loop. Press Ctrl+C to stop.")
frame_count = 0
try:
for frame in camera.read_frames():
if frame is None:
continue
frame_count += 1
# Run detection on current frame
# YOLOv8 model processes the frame and returns list of Detection objects
detections = detector.detect_cats(frame, threshold=threshold)
# Check if we should start recording
# Only start if: (1) not already recording, (2) at least one cat detected
if not recorder.is_recording() and len(detections) > 0:
# Cat detected - start recording
# Generate timestamped filename and get actual camera resolution
output_path = recorder.generate_filename()
actual_resolution = camera.get_resolution() # May differ from config if using full sensor
if recorder.start_recording(output_path, actual_resolution, fps):
logger.info(f"Cat detected! (confidence: {detections[0].confidence:.2f}) - Starting recording")
# Write the frame where cat was detected (don't lose the detection frame)
recorder.write_frame(frame)
else:
logger.warning("Failed to start recording")
# Continue recording if active
# Write all frames while recording, regardless of detections
if recorder.is_recording():
recorder.write_frame(frame)
# Check if recording duration has elapsed
# Recording stops automatically after configured duration (default 60s)
if not recorder.should_continue_recording():
recorder.stop_recording()
# Show preview if requested
if args.show_preview:
# Draw detections on frame
if detections:
draw_detections(frame, detections)
# Add status text
status = "RECORDING" if recorder.is_recording() else "MONITORING"
cv2.putText(
frame,
f"Status: {status} | Frame: {frame_count}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.imshow("Cat Detection Monitor", frame)
# Handle window close or 'q' key
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
logger.info("Preview window closed. Stopping...")
break
# Log frame processing periodically
if frame_count % 100 == 0:
logger.debug(f"Processed {frame_count} frames")
except KeyboardInterrupt:
logger.info("Interrupted by user. Stopping...")
except RuntimeError as e:
# Camera failure - already logged with details
logger.error("Stopping monitoring due to camera failure.")
if recorder.is_recording():
recorder.stop_recording()
camera.release()
if args.show_preview:
cv2.destroyAllWindows()
sys.exit(1)
except Exception as e:
logger.exception(f"Error in monitoring loop: {e}")
finally:
# Cleanup
if recorder.is_recording():
recorder.stop_recording()
camera.release()
if args.show_preview:
try:
cv2.destroyAllWindows()
except Exception as e:
logger.debug(f"Error closing preview windows: {e}")
logger.info("Shutdown complete.")
if __name__ == "__main__":
main()