import io
import itertools
from typing import TYPE_CHECKING
import numpy as np
from morphocut._optional import UnavailableObject
from morphocut.core import Node, Output, ReturnOutputs, Stream, closing_if_closable
[docs]def is_raspberrypi():
"""Check if the platform is a Raspberry Pi."""
try:
with io.open("/sys/firmware/devicetree/base/model", "r") as m:
if "raspberry pi" in m.read().lower():
return True
except Exception:
pass
return False
if TYPE_CHECKING:
import picamera.array as picamera_array
import picamera.camera as picamera_camera
elif is_raspberrypi():
try:
import picamera.array as picamera_array
import picamera.camera as picamera_camera
except ImportError:
msg = "See https://picamera.readthedocs.io/en/latest/install.html for installation instructions."
picamera_array = UnavailableObject("picamera.array", msg)
picamera_camera = UnavailableObject("picamera.camera", msg)
else:
# picamera is installable and importable on systems other than the Raspberry Pi (using READTHEDOCS=True during installation),
# but still not usable.
msg = "This only works on a Raspberry Pi."
picamera_array = UnavailableObject("picamera.array", msg)
picamera_camera = UnavailableObject("picamera.camera", msg)
[docs]@ReturnOutputs
@Output("frame")
class PiCameraReader(Node):
"""
|stream| Read frames from the Raspberry Pi's camera.
Args:
resolution (tuple (width, height), optional): The desired image resolution.
Example:
.. code-block:: python
frame = PiCameraReader()
"""
def __init__(self, **kwargs):
super().__init__()
kwargs.setdefault("resolution", picamera_camera.PiCameraMaxResolution)
self.kwargs = kwargs
def transform_stream(self, stream: Stream) -> Stream:
with picamera_camera.PiCamera(**self.kwargs) as cam:
resolution = picamera_array.raw_resolution(cam.resolution)
with closing_if_closable(stream):
for obj in stream:
output = np.empty(resolution[::-1] + (3,), dtype=np.uint8)
# Capture continously
for _ in cam.capture_continuous(output, format="rgb"):
self.prepare_output(obj, output.copy())
yield obj
self.after_stream()