from types import EllipsisType
from typing import Iterable, Sequence, Optional
import cv2
import numpy as np
from astropy.units import Quantity
from ..core import Processor, Detector
from ..utilities import project, Transform
from ..utilities.patterns import disk, gaussian
[docs]
class Roi:
"""
Represents a Region of Interest (ROI) to compute a (weighted) average over.
This class defines an ROI with specified properties such as coordinates,
radius, mask type, and parameters specific to the mask type.
"""
def __init__(self, pos, radius=0.1, mask_type: str = "disk", waist=None, source_shape=None):
"""
Initialize the Roi object.
Args:
pos (int, int): y,x coordinates of the center of the ROI, measured in pixels from the top-left corner.
when omitted, the default value of source_shape // 2 is used.
note: non-integer positions for the ROI are currently not supported.
radius (float): Radius of the ROI. Default is 0.1.
mask_type: Type of the mask.
Options are 'disk' (default), 'gaussian', or 'square'.
waist (float): Defines the width of the Gaussian distribution in pixels.
Default is 0.5 * radius.
source_shape (int, int): Shape of the source image.
Used to compute a default value for `pos`, and to check if the ROI is fully inside the image.
"""
if pos is None:
pos = (source_shape[0] // 2, source_shape[1] // 2)
if (
round(pos[0] - radius) < 0
or round(pos[1] - radius) < 0
or source_shape is not None
and (round(pos[0] + radius) >= source_shape[0] or round(pos[1] + radius) >= source_shape[1])
):
raise ValueError("ROI does not fit inside source image")
self._pos = pos
self._radius = radius
self._mask_type = mask_type
self._waist = waist if waist is not None else radius * 0.5
self._mask = None
self._mask_sum = 0.0
@property
def pos(self) -> tuple[int, int]:
"""Get the position of the ROI center.
Returns:
tuple[int, int]: y,x coordinates of the center of the ROI, measured in pixels from the top-left corner.
"""
return self._pos
@pos.setter
def pos(self, value: tuple[int, int]):
self._pos = value
self._mask = None # need to re-compute mask
@property
def x(self) -> int:
"""x-coordinate of the center of the ROI, relative to the center of the image."""
return self.pos[1]
@x.setter
def x(self, value: int):
self.pos = (self.pos[0], int(value))
@property
def y(self) -> int:
"""y-coordinate of the center of the ROI, relative to the center of the image."""
return self.pos[0]
@y.setter
def y(self, value: int):
self.pos = (int(value), self.pos[1])
@property
def radius(self) -> float:
"""Radius of the ROI in pixels."""
return self._radius
@radius.setter
def radius(self, value: float):
self._radius = float(value)
self._mask = None # need to re-compute mask
@property
def waist(self) -> float:
"""Width of the Gaussian distribution in pixels."""
return self._waist
@waist.setter
def waist(self, value: float):
self._waist = float(value)
self._mask = None # need to re-compute mask
@property
def mask_type(self) -> str:
"""The type of mask used for the ROI.
Must be one of 'disk', 'gaussian', or 'square'.
"""
return self._mask_type
@mask_type.setter
def mask_type(self, value: str):
if value not in ["disk", "gaussian", "square"]:
raise ValueError("mask_type must be 'disk', 'gaussian', or 'square'")
self._mask_type = value
self._mask = None # need to re-compute mask
[docs]
def apply(self, image: np.ndarray, order: float = 1.0):
"""
Applies the mask to the frame data by computing the weighted average.
Optionally, the image data can be raised to a power before the mask is applied.
This is useful for simulating multi-photon excitation, or for computing
weighted contrast over the mask.
Args:
image (np.ndarray): The source image data.
order (float): The order of the mask. Default is 1.0.
"""
# if any of the variables changed, we need to re-compute the mask
if self._mask is None:
# clip the radius so that it corresponds to at least 1 pixel
r = np.maximum(self._radius, 0.1)
# for circular masks, always use an odd number of pixels so that we have a clearly
# defined center.
# for square masks, instead use the actual size
if self.mask_type == "disk":
d = round(self._radius) * 2 + 1
self._mask = disk(d, radius=r)
elif self.mask_type == "gaussian":
d = round(self._radius) * 2 + 1
self._mask = gaussian(d, self._waist)
else: # square
d = round(self._radius * 2.0)
self._mask = np.ones((d, d))
self._mask_sum = np.sum(self._mask)
image_start = np.array(self.pos) - int(0.5 * self._mask.shape[0] - 0.5)
image_cropped = image[
image_start[0] : image_start[0] + self._mask.shape[0],
image_start[1] : image_start[1] + self._mask.shape[1],
]
if image_cropped.shape != self._mask.shape:
raise ValueError(
f"ROI is larger than the possible area. ROI shape: {self._mask.shape}, "
+ f"Cropped image shape: {image_cropped.shape}"
)
if order != 1.0:
image_cropped = np.power(image_cropped, order)
return np.sum(image_cropped * self._mask) / self._mask_sum
[docs]
class MultipleRoi(Processor):
"""
Processor that averages signals over multiple regions of interest (ROIs).
"""
def __init__(self, source, rois: Sequence[Roi], multi_threaded: bool = True):
"""
Initialize the MultipleRoi processor with a source and multiple ROIs.
Note: changing parameters of the ROIs between triggering and fetching causes a race condition.
Args:
source (Detector): Source detector object to process the data from.
rois (Sequence[Roi]): Sequence of Roi objects defining the regions of interest.
"""
self._rois = np.asarray(rois)
self._source = source
super().__init__(source, multi_threaded=multi_threaded, pixel_size=None, data_shape=self._rois.shape)
def _fetch(self, image: np.ndarray) -> np.ndarray: # noqa
"""
Fetches and processes the data for each ROI from the image.
This method crops the image according to each ROI's position and
calculates the average value within the ROI. If an ROI is larger than
the possible area of the image, a ValueError is raised.
Args:
image (np.ndarray): The source image data.
Returns:
np.ndarray: Array containing the processed data for each ROI.
"""
def apply_mask(mask: Roi):
return mask.apply(image)
return np.vectorize(apply_mask)(self._rois)
[docs]
class SingleRoi(MultipleRoi):
"""Processor that averages a signal over a single region of interest (ROI).
This is a specialized version of MultipleRoi that works with a single ROI.
It provides direct access to the ROI properties (pos, radius, mask_type, waist).
"""
def __init__(
self,
source,
pos=None,
radius=0.1,
mask_type: str = "disk",
waist=0.5,
**kwargs,
):
"""
Processor that averages a signal over a single region of interest (ROI).
Args:
source (Detector): Source detector object to process the data from.
pos (int, int): y,x coordinates of the center of the ROI, measured in pixels from the top-left corner.
when omitted, the default value of source.data_shape // 2 is used.
note: non-integer positions for the ROI are currently not supported.
radius (float): Radius of the ROI in pixels. Default is 0.1.
mask_type: Type of the mask. Options are 'disk', 'gaussian', or 'square'. Default is 'disk'.
waist (float): Defines the width of the Gaussian distribution. Default is 0.5.
"""
single_roi = Roi(pos, radius, mask_type, waist, source.data_shape)
rois = np.array([single_roi]).reshape(())
super().__init__(source, rois=rois, **kwargs)
[docs]
class CropProcessor(Processor):
"""Processor to crop data from the source to some region of interest.
Works on any number of dimensions.
If the cropped area extends beyond the size of the source data,
the data is padded with 'padding_value'
"""
def __init__(
self,
source: Detector,
shape: Sequence[int | None] | EllipsisType = ...,
pos: Optional[Sequence[int]] = None,
padding_value=0.0,
multi_threaded: bool = False,
):
"""
Args:
source (object): The data source to process.
shape (tuple): Size of the cropped region (this is the data_shape property)
may be a tuple holding one or more None values.
These values are then replaced by the size of the source in that dimension.
This only works if the source has a defined data_shape.
pos (tuple): Coordinates of the start of the cropped region.
For 2-D data, this is the top-left corner.
padding_value (float): Value to use if the cropped area extends beyond the original data.
"""
# replace the None values in shape with the corresponding size of the source
# Note: this only works if the source has a specified data shape.
if shape is ...:
data_shape = source.data_shape
else:
try:
data_shape = tuple(s if s is not None else auto for s, auto in zip(shape, source.data_shape))
except AttributeError:
raise ValueError("If `shape` contains None values, `source` must have a defined `data_shape`")
super().__init__(source, data_shape=data_shape, multi_threaded=multi_threaded)
self._pos = np.array(pos) if pos is not None else np.zeros((len(self.data_shape),), dtype=int)
self._padding_value = padding_value
@property
def pos(self) -> tuple:
"""Start ('top-left' corner) of the cropped region."""
return tuple(self._pos)
@pos.setter
def pos(self, value):
self._pos = np.array(value, ndmin=1)
@property
def data_shape(self) -> tuple:
"""Size of the cropped data"""
return self._data_shape
@data_shape.setter
def data_shape(self, value):
self._data_shape = tuple(np.array(value, ndmin=1))
def _fetch(self, image: np.ndarray) -> np.ndarray: # noqa
"""
Args:
image (ndarray): source image
Returns: the out array containing the cropped image.
"""
src_start = np.maximum(self._pos, 0).astype("int32")
src_end = np.minimum(self._pos + self._data_shape, image.shape).astype("int32")
dst_start = np.maximum(-self._pos, 0).astype("int32")
dst_end = dst_start + src_end - src_start
src_select = tuple(slice(start, end) for (start, end) in zip(src_start, src_end))
src = image.__getitem__(src_select)
if any(dst_start != 0) or any(dst_end != self._data_shape):
dst = np.zeros(self._data_shape) + self._padding_value
dst_select = tuple(slice(start, end) for (start, end) in zip(dst_start, dst_end))
dst.__setitem__(dst_select, src)
else:
dst = src
return dst
[docs]
def select_roi(source: Detector, mask_type: str):
"""Opens a window that allows the user to select a region of interest.
This function displays an image from the source detector and allows the user
to interactively select a region of interest by clicking and dragging.
Args:
source: The detector to read the image from.
mask_type: Type of the mask to create. Options are 'disk', 'gaussian', or 'square'.
Returns:
Roi: A Roi object representing the selected region, or None if the selection was cancelled.
Raises:
ValueError: If the mask_type is not one of the supported types.
"""
if mask_type not in ["disk", "gaussian", "square"]:
raise ValueError("mask_type must be 'disk', 'gaussian', or 'square'")
image = cv2.normalize(
source.read(),
None,
alpha=0,
beta=255,
norm_type=cv2.NORM_MINMAX,
dtype=cv2.CV_8U,
)
title = "Select ROI and press c to continue or ESC to cancel"
cv2.namedWindow(title)
cv2.imshow(title, image)
roi_start = np.array((0, 0))
roi_size = 0.0
def mouse_callback(event, x, y, flags, _param):
nonlocal roi_start, roi_size, image
if event == cv2.EVENT_LBUTTONDOWN: # mouse down: select start
roi_start = np.array((x, y))
elif event == cv2.EVENT_MOUSEMOVE and cv2.EVENT_FLAG_LBUTTON & flags:
roi_size = np.minimum(x - roi_start[0], y - roi_start[1])
rect_image = image.copy()
if mask_type == "square":
cv2.rectangle(rect_image, roi_start, roi_start + roi_size, (0.0, 0.0, 255.0), 2)
else:
cv2.circle(
rect_image,
roi_start + roi_size // 2,
abs(roi_size) // 2,
(0.0, 0.0, 255.0),
2,
)
cv2.imshow(title, rect_image)
cv2.setMouseCallback(title, mouse_callback)
while True:
key = cv2.waitKey(1) & 0xFF
if key == ord("c"):
if roi_size is not None:
cv2.destroyWindow(title)
if roi_size < 0.0:
roi_start = roi_start + roi_size
roi_size = -roi_size
return Roi(pos=(roi_start[1], roi_start[0]), radius=0.5 * roi_size)
elif key == 27:
cv2.destroyWindow(title)
return None
[docs]
class FunctionProcessor(Processor):
"""Processor that applies a user-defined function to the data from the source."""
def __init__(self, source: Detector, func, **kwargs):
"""
Processor that applies a user-defined function to the data from the source.
Args:
source (Detector): The data source to process.
func (callable): A function that takes the data from source.read() as input and return a processed version of the data. The output of the function must be a numpy array.
data_shape: Sequence[int] | None: The shape of the output data if known. When omitted, the data_shape of the source is used.
pixel_size: Quantity | None: The pixel size of the output data if known. When omitted, the pixel_size of the source is used.
multi_threaded (bool): Whether to perform processing in a worker thread. Default is True.
"""
super().__init__(source, **kwargs)
self.func = func
def _fetch(self, image):
return self.func(image)
[docs]
def busy(self):
super().busy()