Source code for camera

"""Camera interface using picamera2 library.

This module provides a Camera class that wraps the picamera2 library for
Raspberry Pi camera modules. It handles camera initialization, frame capture,
and automatic detection of full sensor resolution to avoid cropping.

The Camera class automatically detects and uses the full sensor resolution
(e.g., 3280x2464 for Camera Module 2) instead of cropping to a smaller
resolution, ensuring the full field of view is captured.

Key Features:
    - Automatic full sensor resolution detection
    - Frame rate control
    - Camera validation
    - Error handling and recovery
    - Support for IR camera controls (exposure, gain)

Example:
    Basic usage::
        camera = Camera(device="0", resolution=(640, 480), fps=10)
        if camera.open():
            frame = camera.read_frame()
            camera.release()

    Using context manager::
        with Camera() as cam:
            for frame in cam.read_frames():
                # Process frame
                pass
"""

import logging
import numpy as np
import time
from typing import Generator, Optional, Tuple, Dict, Any
from config import DEFAULT_CAMERA_DEVICE, DEFAULT_FPS, DEFAULT_RESOLUTION, get_camera_controls

try:
    from picamera2 import Picamera2
    PICAMERA2_AVAILABLE = True
except ImportError:
    PICAMERA2_AVAILABLE = False
    Picamera2 = None

logger = logging.getLogger(__name__)


[docs] class Camera: """Wrapper for picamera2 camera interface. Provides a high-level interface for Raspberry Pi camera modules using picamera2. Automatically detects full sensor resolution and handles frame conversion to OpenCV-compatible BGR format. The camera automatically uses the full sensor resolution (detected from preview configuration) to ensure the complete field of view is captured, matching the behavior of rpicam-hello. Attributes: device: Camera device identifier (string, typically "0" for first camera). resolution: Requested resolution tuple (width, height). Note: Actual resolution may be full sensor size. fps: Target frames per second. picam2: Internal Picamera2 instance (None until opened). frame_time: Calculated time per frame (1.0 / fps). last_frame_time: Timestamp of last frame read. Example: >>> camera = Camera(device="0", resolution=(640, 480), fps=10) >>> if camera.open(): ... actual_res = camera.get_resolution() ... print(f"Actual resolution: {actual_res}") >>> success, frame = camera.read_frame() >>> camera.release() """
[docs] def __init__( self, device: str = DEFAULT_CAMERA_DEVICE, resolution: Tuple[int, int] = DEFAULT_RESOLUTION, fps: int = DEFAULT_FPS, ): """Initialize camera instance. Creates a Camera instance but does not open the camera hardware. Call open() to actually start the camera. Args: device: Camera device identifier. Can be: - Camera index as string: "0", "1", etc. (recommended) - Device path: "/dev/video0" (converted to index) Defaults to "0" for first camera. resolution: Requested resolution tuple (width, height). Note: The actual resolution used will be the full sensor resolution (detected automatically) to avoid cropping. fps: Target frames per second for frame capture. Must be positive. Raises: ImportError: If picamera2 is not installed or not available in the current Python environment. Note: The camera hardware is not accessed until open() is called. This allows creating Camera instances without immediately claiming the camera resource. """ if not PICAMERA2_AVAILABLE: import sys venv_msg = "" if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): venv_msg = ( "\n\nNOTE: You are running in a virtual environment. " "System-installed picamera2 (via apt) is not available in venv by default.\n" "Options:\n" " 1. Install picamera2 in venv: pip install picamera2 (requires: sudo apt install libcap-dev first)\n" " 2. Use system Python instead of venv for this project\n" " 3. Create venv with --system-site-packages: python3 -m venv --system-site-packages venv\n" ) raise ImportError( "picamera2 is not installed or not available in this Python environment.\n" "Install with: sudo apt install python3-picamera2" + venv_msg ) # Convert device to camera index if it's a path-like string if device.startswith("/dev/video"): self.camera_index = int(device.replace("/dev/video", "")) logger.warning( f"Device path {device} converted to camera index {self.camera_index}. " "picamera2 uses camera indices." ) else: try: self.camera_index = int(device) except ValueError: logger.warning(f"Invalid camera device '{device}', using default camera (index 0)") self.camera_index = 0 self.device = str(self.camera_index) # Keep for compatibility self.resolution = resolution self.fps = fps self.picam2: Optional[Picamera2] = None self.frame_time = 1.0 / fps if fps > 0 else 0.1 self.last_frame_time = 0.0
[docs] def open(self) -> bool: """Open and configure the camera using picamera2. Initializes the camera hardware, detects full sensor resolution, and configures video capture. Applies optional camera controls from config (exposure time, gain, auto-exposure) for IR/low-light optimization. The camera automatically uses the full sensor resolution (e.g., 3280x2464 for Camera Module 2) instead of cropping to the requested resolution, ensuring the complete field of view is captured. Returns: True if camera opened and configured successfully, False otherwise. Note: This method must be called before reading frames. If it fails, check camera hardware connections and ensure no other process is using the camera. """ try: # Create Picamera2 instance # For camera index > 0, we would need to specify camera_num, but picamera2 # typically auto-detects the first camera. For multiple cameras, we'd need # to use camera_num parameter, but for now we'll use default (first camera) self.picam2 = Picamera2(camera_num=self.camera_index if self.camera_index > 0 else 0) # Get camera controls from config (if any) camera_controls = get_camera_controls() # Build controls dict starting with FrameRate controls_dict = {"FrameRate": self.fps} # Add optional camera controls from config if camera_controls: # Validate and add ExposureTime if provided if "ExposureTime" in camera_controls: exposure_time = camera_controls["ExposureTime"] if isinstance(exposure_time, (int, float)) and exposure_time > 0: controls_dict["ExposureTime"] = int(exposure_time) logger.info(f"Setting manual ExposureTime: {exposure_time} microseconds") else: logger.warning(f"Invalid ExposureTime value: {exposure_time}. Ignoring.") # Validate and add AnalogueGain if provided if "AnalogueGain" in camera_controls: gain = camera_controls["AnalogueGain"] if isinstance(gain, (int, float)) and gain > 0: controls_dict["AnalogueGain"] = float(gain) logger.info(f"Setting manual AnalogueGain: {gain}") else: logger.warning(f"Invalid AnalogueGain value: {gain}. Ignoring.") # Validate and add AeEnable if provided if "AeEnable" in camera_controls: ae_enable = camera_controls["AeEnable"] if isinstance(ae_enable, bool): controls_dict["AeEnable"] = ae_enable logger.info(f"Setting AeEnable: {ae_enable}") else: logger.warning(f"Invalid AeEnable value: {ae_enable}. Must be true/false. Ignoring.") # Detect full sensor resolution # When a specific resolution is requested (e.g., 640x480), picamera2 may crop the sensor # to match that resolution. To capture the full field of view like rpicam-hello does, # we detect the full sensor size by querying the preview configuration (which uses # the full sensor by default) and use that for video capture. try: preview_config = self.picam2.create_preview_configuration() full_sensor_size = preview_config['main']['size'] logger.info(f"Detected full sensor resolution: {full_sensor_size[0]}x{full_sensor_size[1]}") except Exception as e: logger.warning(f"Could not detect full sensor size: {e}. Using configured resolution.") full_sensor_size = None # Use full sensor resolution instead of the configured resolution # This ensures we capture the full field of view like rpicam-hello does if full_sensor_size: capture_resolution = full_sensor_size logger.info(f"Using full sensor resolution: {capture_resolution[0]}x{capture_resolution[1]} " f"(configured was {self.resolution[0]}x{self.resolution[1]})") else: capture_resolution = self.resolution logger.info(f"Using configured resolution: {capture_resolution[0]}x{capture_resolution[1]}") # Create video configuration with full sensor resolution video_config = self.picam2.create_video_configuration( main={"size": capture_resolution}, controls=controls_dict ) # Configure and start the camera self.picam2.configure(video_config) self.picam2.start() # Apply additional controls after camera starts (some controls must be set after start) if camera_controls: try: # Set controls that may need to be applied after camera starts post_start_controls = {} if "ExposureTime" in camera_controls and isinstance(camera_controls["ExposureTime"], (int, float)): post_start_controls["ExposureTime"] = int(camera_controls["ExposureTime"]) if "AnalogueGain" in camera_controls and isinstance(camera_controls["AnalogueGain"], (int, float)): post_start_controls["AnalogueGain"] = float(camera_controls["AnalogueGain"]) if "AeEnable" in camera_controls and isinstance(camera_controls["AeEnable"], bool): post_start_controls["AeEnable"] = camera_controls["AeEnable"] if post_start_controls: self.picam2.set_controls(post_start_controls) logger.debug(f"Applied post-start camera controls: {post_start_controls}") except Exception as e: logger.warning(f"Could not apply some camera controls: {e}. Camera will use default settings.") # Get actual resolution from the configured stream stream_config = self.picam2.camera_config['main'] actual_width = stream_config['size'][0] actual_height = stream_config['size'][1] logger.info(f"Camera opened via picamera2 (index: {self.camera_index})") logger.info(f"Resolution: {actual_width}x{actual_height}, FPS: {self.fps}") return True except Exception as e: logger.error(f"Error opening camera: {e}") if self.picam2: try: self.picam2.close() except Exception as e: logger.debug(f"Error closing camera during cleanup: {e}") self.picam2 = None return False
def _convert_frame(self, frame: np.ndarray) -> np.ndarray: """Convert a frame from picamera2 format to OpenCV-compatible BGR format. Converts frames from picamera2's RGB/RGBA format to OpenCV's BGR format. Handles both 3-channel (RGB) and 4-channel (RGBA) input frames, ensuring the output is always a writable, contiguous 3-channel BGR array. Args: frame: Raw frame from picamera2. Can be RGB (shape: [H, W, 3]) or RGBA (shape: [H, W, 4]). May be read-only or non-contiguous. Returns: Converted frame in BGR format (shape: [H, W, 3], dtype: uint8). The array is guaranteed to be contiguous and writable, suitable for OpenCV operations like cv2.putText(), cv2.rectangle(), etc. Note: This is a private method used internally by read_frame() and read_frames(). It ensures frames are compatible with OpenCV functions that require writable arrays. """ # Ensure uint8 dtype (picamera2 may return float or other types) frame = frame.astype(np.uint8) # Handle 4-channel RGBA frames by taking only RGB channels # YOLOv8 expects 3-channel input, and OpenCV BGR also uses 3 channels if len(frame.shape) == 3 and frame.shape[2] == 4: frame = frame[:, :, :3] # Take only RGB channels, drop alpha # Convert RGB to BGR for OpenCV compatibility # OpenCV uses BGR color order, picamera2 uses RGB frame = frame[:, :, ::-1] # Reverse channel order: RGB -> BGR # Always create a fresh contiguous copy to ensure OpenCV compatibility # Some OpenCV functions (e.g., putText, rectangle) require writable arrays # and may fail with read-only or non-contiguous arrays from picamera2 frame = np.ascontiguousarray(frame, dtype=np.uint8) return frame
[docs] def read_frame(self) -> Optional[Tuple[bool, Optional[np.ndarray]]]: """ Read a single frame from the camera. Returns: Tuple of (success, frame) or None if camera not opened """ if self.picam2 is None: return None try: # picamera2 returns frames in RGB format (or RGBA) frame = self.picam2.capture_array() # Convert to OpenCV-compatible BGR format frame = self._convert_frame(frame) return (True, frame) except Exception as e: logger.debug(f"Error reading frame: {e}") return (False, None)
[docs] def read_frames(self) -> Generator[Optional[np.ndarray], None, None]: """ Generator that yields frames at the configured FPS rate. Yields: Frame as numpy array, or None if read failed """ if self.picam2 is None: raise RuntimeError("Camera not opened. Call open() first.") # Throttling variables to prevent log spam during camera issues consecutive_failures = 0 last_warning_time = 0.0 last_error_time = 0.0 warning_interval = 5.0 # Only warn every 5 seconds error_interval = 10.0 # Only error every 10 seconds error_threshold = 30 # Start logging errors after this many failures max_failures = 50 # Maximum consecutive failures before raising exception while True: current_time = time.time() elapsed = current_time - self.last_frame_time if elapsed >= self.frame_time: try: # picamera2 returns frames in RGB format (or RGBA) frame = self.picam2.capture_array() # Convert to OpenCV-compatible BGR format frame = self._convert_frame(frame) self.last_frame_time = current_time consecutive_failures = 0 yield frame except Exception as e: consecutive_failures += 1 self.last_frame_time = current_time # Raise exception if too many consecutive failures if consecutive_failures >= max_failures: error_msg = ( f"Camera failure: {consecutive_failures} consecutive frame read failures. " f"Camera (index: {self.camera_index}) may be disconnected or malfunctioning.\n" f"Please check:\n" f" 1. Camera is detected: libcamera-hello --list-cameras\n" f" 2. Camera is not being used by another process\n" f" 3. Try a different camera index: --camera-device 1\n" f" 4. Verify picamera2 is working: python3 -c 'from picamera2 import Picamera2; Picamera2()'" ) logger.error(error_msg) raise RuntimeError(error_msg) # Log error if threshold crossed and enough time has passed if consecutive_failures >= error_threshold: if current_time - last_error_time >= error_interval: logger.error( f"Too many consecutive frame read failures ({consecutive_failures}). " "Camera may be disconnected or malfunctioning." ) last_error_time = current_time # Otherwise, log warning if enough time has passed since last warning elif current_time - last_warning_time >= warning_interval: logger.warning( f"Failed to read frame from camera (consecutive failures: {consecutive_failures}): {e}" ) last_warning_time = current_time yield None else: # Sleep to maintain FPS time.sleep(self.frame_time - elapsed)
[docs] def get_resolution(self) -> Tuple[int, int]: """ Get the actual camera resolution. Returns: Tuple of (width, height) """ if self.picam2 is None: return self.resolution try: stream_config = self.picam2.camera_config['main'] return (stream_config['size'][0], stream_config['size'][1]) except (KeyError, AttributeError, TypeError): # Fallback to requested resolution if we can't get actual size return self.resolution
[docs] def validate_camera(self, test_frames: int = 5, success_threshold: int = 3, timeout: float = 5.0) -> bool: """ Validate that the camera can successfully read frames. Args: test_frames: Number of frames to attempt reading success_threshold: Minimum number of successful reads required timeout: Maximum time to wait for frames in seconds Returns: True if camera can read frames successfully, False otherwise """ if self.picam2 is None: logger.error("Camera not opened. Cannot validate.") return False logger.info(f"Validating camera: attempting to read {test_frames} frames...") successful_reads = 0 start_time = time.time() for i in range(test_frames): if time.time() - start_time > timeout: logger.warning(f"Camera validation timeout after {timeout}s") break try: frame = self.picam2.capture_array() if frame is not None and frame.size > 0: # Ensure frame is writable for validation if not frame.flags['WRITEABLE']: frame = frame.copy() successful_reads += 1 logger.debug(f"Frame {i+1}/{test_frames}: Success") else: logger.debug(f"Frame {i+1}/{test_frames}: Failed (empty frame)") except Exception as e: logger.debug(f"Frame {i+1}/{test_frames}: Failed ({e})") # Small delay between attempts time.sleep(0.1) if successful_reads >= success_threshold: logger.info(f"Camera validation passed: {successful_reads}/{test_frames} frames read successfully") return True else: logger.error( f"Camera validation failed: Only {successful_reads}/{test_frames} frames read successfully. " f"Required: {success_threshold}+ successful reads." ) return False
[docs] def release(self) -> None: """Release the camera resource.""" if self.picam2 is not None: try: self.picam2.stop() self.picam2.close() except Exception as e: logger.warning(f"Error releasing camera: {e}") self.picam2 = None logger.info("Camera released")
[docs] def __enter__(self): """Context manager entry.""" self.open() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.release()