"""Camera interface using picamera2 library.
This module provides a Camera class that wraps the picamera2 library for
Raspberry Pi camera modules. It handles camera initialization, frame capture,
and automatic detection of full sensor resolution to avoid cropping.
The Camera class automatically detects and uses the full sensor resolution
(e.g., 3280x2464 for Camera Module 2) instead of cropping to a smaller
resolution, ensuring the full field of view is captured.
Key Features:
- Automatic full sensor resolution detection
- Frame rate control
- Camera validation
- Error handling and recovery
- Support for IR camera controls (exposure, gain)
Example:
Basic usage::
camera = Camera(device="0", resolution=(640, 480), fps=10)
if camera.open():
frame = camera.read_frame()
camera.release()
Using context manager::
with Camera() as cam:
for frame in cam.read_frames():
# Process frame
pass
"""
import logging
import numpy as np
import time
from typing import Generator, Optional, Tuple, Dict, Any
from config import DEFAULT_CAMERA_DEVICE, DEFAULT_FPS, DEFAULT_RESOLUTION, get_camera_controls
try:
from picamera2 import Picamera2
PICAMERA2_AVAILABLE = True
except ImportError:
PICAMERA2_AVAILABLE = False
Picamera2 = None
logger = logging.getLogger(__name__)
[docs]
class Camera:
"""Wrapper for picamera2 camera interface.
Provides a high-level interface for Raspberry Pi camera modules using
picamera2. Automatically detects full sensor resolution and handles frame
conversion to OpenCV-compatible BGR format.
The camera automatically uses the full sensor resolution (detected from
preview configuration) to ensure the complete field of view is captured,
matching the behavior of rpicam-hello.
Attributes:
device: Camera device identifier (string, typically "0" for first camera).
resolution: Requested resolution tuple (width, height). Note: Actual resolution may be full sensor size.
fps: Target frames per second.
picam2: Internal Picamera2 instance (None until opened).
frame_time: Calculated time per frame (1.0 / fps).
last_frame_time: Timestamp of last frame read.
Example:
>>> camera = Camera(device="0", resolution=(640, 480), fps=10)
>>> if camera.open():
... actual_res = camera.get_resolution()
... print(f"Actual resolution: {actual_res}")
>>> success, frame = camera.read_frame()
>>> camera.release()
"""
[docs]
def __init__(
self,
device: str = DEFAULT_CAMERA_DEVICE,
resolution: Tuple[int, int] = DEFAULT_RESOLUTION,
fps: int = DEFAULT_FPS,
):
"""Initialize camera instance.
Creates a Camera instance but does not open the camera hardware.
Call open() to actually start the camera.
Args:
device: Camera device identifier. Can be:
- Camera index as string: "0", "1", etc. (recommended)
- Device path: "/dev/video0" (converted to index)
Defaults to "0" for first camera.
resolution: Requested resolution tuple (width, height). Note: The
actual resolution used will be the full sensor resolution
(detected automatically) to avoid cropping.
fps: Target frames per second for frame capture. Must be positive.
Raises:
ImportError: If picamera2 is not installed or not available in the
current Python environment.
Note:
The camera hardware is not accessed until open() is called. This
allows creating Camera instances without immediately claiming the
camera resource.
"""
if not PICAMERA2_AVAILABLE:
import sys
venv_msg = ""
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
venv_msg = (
"\n\nNOTE: You are running in a virtual environment. "
"System-installed picamera2 (via apt) is not available in venv by default.\n"
"Options:\n"
" 1. Install picamera2 in venv: pip install picamera2 (requires: sudo apt install libcap-dev first)\n"
" 2. Use system Python instead of venv for this project\n"
" 3. Create venv with --system-site-packages: python3 -m venv --system-site-packages venv\n"
)
raise ImportError(
"picamera2 is not installed or not available in this Python environment.\n"
"Install with: sudo apt install python3-picamera2"
+ venv_msg
)
# Convert device to camera index if it's a path-like string
if device.startswith("/dev/video"):
self.camera_index = int(device.replace("/dev/video", ""))
logger.warning(
f"Device path {device} converted to camera index {self.camera_index}. "
"picamera2 uses camera indices."
)
else:
try:
self.camera_index = int(device)
except ValueError:
logger.warning(f"Invalid camera device '{device}', using default camera (index 0)")
self.camera_index = 0
self.device = str(self.camera_index) # Keep for compatibility
self.resolution = resolution
self.fps = fps
self.picam2: Optional[Picamera2] = None
self.frame_time = 1.0 / fps if fps > 0 else 0.1
self.last_frame_time = 0.0
[docs]
def open(self) -> bool:
"""Open and configure the camera using picamera2.
Initializes the camera hardware, detects full sensor resolution, and
configures video capture. Applies optional camera controls from config
(exposure time, gain, auto-exposure) for IR/low-light optimization.
The camera automatically uses the full sensor resolution (e.g., 3280x2464
for Camera Module 2) instead of cropping to the requested resolution,
ensuring the complete field of view is captured.
Returns:
True if camera opened and configured successfully, False otherwise.
Note:
This method must be called before reading frames. If it fails,
check camera hardware connections and ensure no other process is
using the camera.
"""
try:
# Create Picamera2 instance
# For camera index > 0, we would need to specify camera_num, but picamera2
# typically auto-detects the first camera. For multiple cameras, we'd need
# to use camera_num parameter, but for now we'll use default (first camera)
self.picam2 = Picamera2(camera_num=self.camera_index if self.camera_index > 0 else 0)
# Get camera controls from config (if any)
camera_controls = get_camera_controls()
# Build controls dict starting with FrameRate
controls_dict = {"FrameRate": self.fps}
# Add optional camera controls from config
if camera_controls:
# Validate and add ExposureTime if provided
if "ExposureTime" in camera_controls:
exposure_time = camera_controls["ExposureTime"]
if isinstance(exposure_time, (int, float)) and exposure_time > 0:
controls_dict["ExposureTime"] = int(exposure_time)
logger.info(f"Setting manual ExposureTime: {exposure_time} microseconds")
else:
logger.warning(f"Invalid ExposureTime value: {exposure_time}. Ignoring.")
# Validate and add AnalogueGain if provided
if "AnalogueGain" in camera_controls:
gain = camera_controls["AnalogueGain"]
if isinstance(gain, (int, float)) and gain > 0:
controls_dict["AnalogueGain"] = float(gain)
logger.info(f"Setting manual AnalogueGain: {gain}")
else:
logger.warning(f"Invalid AnalogueGain value: {gain}. Ignoring.")
# Validate and add AeEnable if provided
if "AeEnable" in camera_controls:
ae_enable = camera_controls["AeEnable"]
if isinstance(ae_enable, bool):
controls_dict["AeEnable"] = ae_enable
logger.info(f"Setting AeEnable: {ae_enable}")
else:
logger.warning(f"Invalid AeEnable value: {ae_enable}. Must be true/false. Ignoring.")
# Detect full sensor resolution
# When a specific resolution is requested (e.g., 640x480), picamera2 may crop the sensor
# to match that resolution. To capture the full field of view like rpicam-hello does,
# we detect the full sensor size by querying the preview configuration (which uses
# the full sensor by default) and use that for video capture.
try:
preview_config = self.picam2.create_preview_configuration()
full_sensor_size = preview_config['main']['size']
logger.info(f"Detected full sensor resolution: {full_sensor_size[0]}x{full_sensor_size[1]}")
except Exception as e:
logger.warning(f"Could not detect full sensor size: {e}. Using configured resolution.")
full_sensor_size = None
# Use full sensor resolution instead of the configured resolution
# This ensures we capture the full field of view like rpicam-hello does
if full_sensor_size:
capture_resolution = full_sensor_size
logger.info(f"Using full sensor resolution: {capture_resolution[0]}x{capture_resolution[1]} "
f"(configured was {self.resolution[0]}x{self.resolution[1]})")
else:
capture_resolution = self.resolution
logger.info(f"Using configured resolution: {capture_resolution[0]}x{capture_resolution[1]}")
# Create video configuration with full sensor resolution
video_config = self.picam2.create_video_configuration(
main={"size": capture_resolution},
controls=controls_dict
)
# Configure and start the camera
self.picam2.configure(video_config)
self.picam2.start()
# Apply additional controls after camera starts (some controls must be set after start)
if camera_controls:
try:
# Set controls that may need to be applied after camera starts
post_start_controls = {}
if "ExposureTime" in camera_controls and isinstance(camera_controls["ExposureTime"], (int, float)):
post_start_controls["ExposureTime"] = int(camera_controls["ExposureTime"])
if "AnalogueGain" in camera_controls and isinstance(camera_controls["AnalogueGain"], (int, float)):
post_start_controls["AnalogueGain"] = float(camera_controls["AnalogueGain"])
if "AeEnable" in camera_controls and isinstance(camera_controls["AeEnable"], bool):
post_start_controls["AeEnable"] = camera_controls["AeEnable"]
if post_start_controls:
self.picam2.set_controls(post_start_controls)
logger.debug(f"Applied post-start camera controls: {post_start_controls}")
except Exception as e:
logger.warning(f"Could not apply some camera controls: {e}. Camera will use default settings.")
# Get actual resolution from the configured stream
stream_config = self.picam2.camera_config['main']
actual_width = stream_config['size'][0]
actual_height = stream_config['size'][1]
logger.info(f"Camera opened via picamera2 (index: {self.camera_index})")
logger.info(f"Resolution: {actual_width}x{actual_height}, FPS: {self.fps}")
return True
except Exception as e:
logger.error(f"Error opening camera: {e}")
if self.picam2:
try:
self.picam2.close()
except Exception as e:
logger.debug(f"Error closing camera during cleanup: {e}")
self.picam2 = None
return False
def _convert_frame(self, frame: np.ndarray) -> np.ndarray:
"""Convert a frame from picamera2 format to OpenCV-compatible BGR format.
Converts frames from picamera2's RGB/RGBA format to OpenCV's BGR format.
Handles both 3-channel (RGB) and 4-channel (RGBA) input frames, ensuring
the output is always a writable, contiguous 3-channel BGR array.
Args:
frame: Raw frame from picamera2. Can be RGB (shape: [H, W, 3]) or
RGBA (shape: [H, W, 4]). May be read-only or non-contiguous.
Returns:
Converted frame in BGR format (shape: [H, W, 3], dtype: uint8).
The array is guaranteed to be contiguous and writable, suitable for
OpenCV operations like cv2.putText(), cv2.rectangle(), etc.
Note:
This is a private method used internally by read_frame() and
read_frames(). It ensures frames are compatible with OpenCV functions
that require writable arrays.
"""
# Ensure uint8 dtype (picamera2 may return float or other types)
frame = frame.astype(np.uint8)
# Handle 4-channel RGBA frames by taking only RGB channels
# YOLOv8 expects 3-channel input, and OpenCV BGR also uses 3 channels
if len(frame.shape) == 3 and frame.shape[2] == 4:
frame = frame[:, :, :3] # Take only RGB channels, drop alpha
# Convert RGB to BGR for OpenCV compatibility
# OpenCV uses BGR color order, picamera2 uses RGB
frame = frame[:, :, ::-1] # Reverse channel order: RGB -> BGR
# Always create a fresh contiguous copy to ensure OpenCV compatibility
# Some OpenCV functions (e.g., putText, rectangle) require writable arrays
# and may fail with read-only or non-contiguous arrays from picamera2
frame = np.ascontiguousarray(frame, dtype=np.uint8)
return frame
[docs]
def read_frame(self) -> Optional[Tuple[bool, Optional[np.ndarray]]]:
"""
Read a single frame from the camera.
Returns:
Tuple of (success, frame) or None if camera not opened
"""
if self.picam2 is None:
return None
try:
# picamera2 returns frames in RGB format (or RGBA)
frame = self.picam2.capture_array()
# Convert to OpenCV-compatible BGR format
frame = self._convert_frame(frame)
return (True, frame)
except Exception as e:
logger.debug(f"Error reading frame: {e}")
return (False, None)
[docs]
def read_frames(self) -> Generator[Optional[np.ndarray], None, None]:
"""
Generator that yields frames at the configured FPS rate.
Yields:
Frame as numpy array, or None if read failed
"""
if self.picam2 is None:
raise RuntimeError("Camera not opened. Call open() first.")
# Throttling variables to prevent log spam during camera issues
consecutive_failures = 0
last_warning_time = 0.0
last_error_time = 0.0
warning_interval = 5.0 # Only warn every 5 seconds
error_interval = 10.0 # Only error every 10 seconds
error_threshold = 30 # Start logging errors after this many failures
max_failures = 50 # Maximum consecutive failures before raising exception
while True:
current_time = time.time()
elapsed = current_time - self.last_frame_time
if elapsed >= self.frame_time:
try:
# picamera2 returns frames in RGB format (or RGBA)
frame = self.picam2.capture_array()
# Convert to OpenCV-compatible BGR format
frame = self._convert_frame(frame)
self.last_frame_time = current_time
consecutive_failures = 0
yield frame
except Exception as e:
consecutive_failures += 1
self.last_frame_time = current_time
# Raise exception if too many consecutive failures
if consecutive_failures >= max_failures:
error_msg = (
f"Camera failure: {consecutive_failures} consecutive frame read failures. "
f"Camera (index: {self.camera_index}) may be disconnected or malfunctioning.\n"
f"Please check:\n"
f" 1. Camera is detected: libcamera-hello --list-cameras\n"
f" 2. Camera is not being used by another process\n"
f" 3. Try a different camera index: --camera-device 1\n"
f" 4. Verify picamera2 is working: python3 -c 'from picamera2 import Picamera2; Picamera2()'"
)
logger.error(error_msg)
raise RuntimeError(error_msg)
# Log error if threshold crossed and enough time has passed
if consecutive_failures >= error_threshold:
if current_time - last_error_time >= error_interval:
logger.error(
f"Too many consecutive frame read failures ({consecutive_failures}). "
"Camera may be disconnected or malfunctioning."
)
last_error_time = current_time
# Otherwise, log warning if enough time has passed since last warning
elif current_time - last_warning_time >= warning_interval:
logger.warning(
f"Failed to read frame from camera (consecutive failures: {consecutive_failures}): {e}"
)
last_warning_time = current_time
yield None
else:
# Sleep to maintain FPS
time.sleep(self.frame_time - elapsed)
[docs]
def get_resolution(self) -> Tuple[int, int]:
"""
Get the actual camera resolution.
Returns:
Tuple of (width, height)
"""
if self.picam2 is None:
return self.resolution
try:
stream_config = self.picam2.camera_config['main']
return (stream_config['size'][0], stream_config['size'][1])
except (KeyError, AttributeError, TypeError):
# Fallback to requested resolution if we can't get actual size
return self.resolution
[docs]
def validate_camera(self, test_frames: int = 5, success_threshold: int = 3, timeout: float = 5.0) -> bool:
"""
Validate that the camera can successfully read frames.
Args:
test_frames: Number of frames to attempt reading
success_threshold: Minimum number of successful reads required
timeout: Maximum time to wait for frames in seconds
Returns:
True if camera can read frames successfully, False otherwise
"""
if self.picam2 is None:
logger.error("Camera not opened. Cannot validate.")
return False
logger.info(f"Validating camera: attempting to read {test_frames} frames...")
successful_reads = 0
start_time = time.time()
for i in range(test_frames):
if time.time() - start_time > timeout:
logger.warning(f"Camera validation timeout after {timeout}s")
break
try:
frame = self.picam2.capture_array()
if frame is not None and frame.size > 0:
# Ensure frame is writable for validation
if not frame.flags['WRITEABLE']:
frame = frame.copy()
successful_reads += 1
logger.debug(f"Frame {i+1}/{test_frames}: Success")
else:
logger.debug(f"Frame {i+1}/{test_frames}: Failed (empty frame)")
except Exception as e:
logger.debug(f"Frame {i+1}/{test_frames}: Failed ({e})")
# Small delay between attempts
time.sleep(0.1)
if successful_reads >= success_threshold:
logger.info(f"Camera validation passed: {successful_reads}/{test_frames} frames read successfully")
return True
else:
logger.error(
f"Camera validation failed: Only {successful_reads}/{test_frames} frames read successfully. "
f"Required: {success_threshold}+ successful reads."
)
return False
[docs]
def release(self) -> None:
"""Release the camera resource."""
if self.picam2 is not None:
try:
self.picam2.stop()
self.picam2.close()
except Exception as e:
logger.warning(f"Error releasing camera: {e}")
self.picam2 = None
logger.info("Camera released")
[docs]
def __enter__(self):
"""Context manager entry."""
self.open()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()