import logging
import threading
import time
from abc import ABC, abstractmethod
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Set, final, Tuple, Optional
from weakref import WeakSet
import astropy.units as u
import numpy as np
from astropy.units import Quantity
from numpy.typing import ArrayLike
from .utilities import set_pixel_size
[docs]
class Device(ABC):
"""Base class for detectors and actuators
See :ref:`key_concepts` for more information.
"""
__slots__ = (
"_end_time_ns",
"_timeout_margin",
"_locking_thread",
"_error",
"__weakref__",
"_latency",
"_duration",
"_multi_threaded",
)
_workers = ThreadPoolExecutor(thread_name_prefix="Device._workers")
_moving = False
_state_lock = threading.Lock()
_devices: "Set[Device]" = WeakSet()
def __init__(self, *, duration: Quantity[u.ms], latency: Quantity[u.ms]):
"""Constructs a new Device object"""
self._latency = latency
self._duration = duration
self._end_time_ns = 0
self._timeout_margin = 5 * u.s
self._locking_thread = None
self._error = None
Device._devices.add(self)
@property
@abstractmethod
def _is_actuator(self):
"""True for actuators, False for detectors"""
...
def _start(self):
"""Switches the state to 'moving' (for actuators) or 'measuring' (for detectors).
This function changes the global state to 'moving' or 'measuring' if needed,
and it may block until this state switch is completed.
After switching, stores the time at which the operation will have ended in the ``_end_time_ns``
field (i.e., ``time.time_ns() + self.latency + self.duration``).
"""
# acquire a global lock, to prevent multiple threads to switch moving/measuring state simultaneously
with Device._state_lock:
# check if transition from moving/measuring or vice versa is needed
if Device._moving != self._is_actuator():
if Device._moving:
logging.debug("switch to MEASURING requested by %s.", self)
else:
logging.debug("switch to MOVING requested by %s.", self)
same_type = [device for device in Device._devices if device._is_actuator == self._is_actuator]
other_type = [device for device in Device._devices if device._is_actuator != self._is_actuator]
# compute the minimum latency of same_type
# for instance, when switching to 'measuring', this number tells us how long it takes before any of the
# detectors actually starts a measurement.
# If this is a positive number, we can make the switch to 'measuring' slightly _before_
# all actuators have stabilized.
latency = min([device.latency for device in same_type], default=0.0 * u.ns) # noqa - incorrect warning
# wait until all devices of the other type have (almost) finished
for device in other_type:
device.wait(up_to=latency)
# changes the state from moving to measuring and vice versa
Device._moving = not Device._moving
if Device._moving:
logging.debug("state is now MOVING.")
else:
logging.debug("state is now MEASURING.")
# also store the time we expect the operation to finish
# note: it may finish slightly earlier since (latency + duration) is a maximum value
self._end_time_ns = time.time_ns() + self.latency.to_value(u.ns) + self.duration.to_value(u.ns)
@property
def latency(self) -> Quantity[u.ms]:
"""latency (Quantity[u.ms]): minimum amount of time between sending a command or trigger to the device and the
moment the device starts responding.
The default value is 0.0 ms.
Note:
The latency is used to compute when it is safe to switch the global state from 'moving'
to 'measuring' or vice versa.
Devices that report a non-zero latency promise not to do anything before the latency has passed.
This allows making the state switch even before the devices of the other type have all finished.
This construct is used for spatial light modulators, which typically have a long latency (1-2 frames).
Due to this latency, we may send an `update` command to the SLM even before the camera has finished reading
the previous frame.
Note:
A device is allowed to report a different latency every time `latency` is called.
For example, for a spatial light modulator that only refreshes
at a fixed rate, we can add the remaining time until the next refresh to the latency.
"""
return self._latency
@property
def duration(self) -> Quantity[u.ms]:
"""duration (Quantity[u.ms]): maximum amount of time it takes to perform the measurement or for the
actuator to stabilize.
This value *does not* include the latency.
For a detector, this is the maximum amount of time that elapses between returning from `trigger()` and the
end of the measurement.
For an actuator, this is the maximum amount of time that elapses from returning from a command like `update(
)` and the stabilization of the device.
If the duration of an operation is not known in advance,
(e.g., when waiting for a hardware trigger), this function should return `np.inf * u.ms`.
Note: A device may update the duration dynamically.
For example, a stage may compute the required time to
move to the target position and update the duration accordingly.
Note: If `latency` is a (lower) estimate, the duration should be high enough to guarantee that `latency +
duration` is at least as large as the time between starting the operation and finishing it.
"""
return self._duration
[docs]
def wait(self, up_to: Optional[Quantity[u.ms]] = None) -> None:
"""Waits until the device is (almost) in the `ready` state, i.e., has finished measuring or moving.
This function is called by `_start` automatically to ensure proper synchronization between detectors and
actuators, and it is called by `__del__` to ensure the device is not active when it is destroyed.
The only time to call `wait` explicitly is when using pipelined measurements, see `Detector.trigger()`.
For devices that report a duration (`duration ≠ ∞`), this function waits until
`current_time - up_to >= self._end_time_ns`,
where `_end_time_ns` was set by the last call to `_start`.
For devices that report no duration `duration = ∞`, this function repeatedly calls `busy` until `busy`
returns `False`. In this case, `up_to` is ignored.
Args:
up_to(Quantity[u.ms]):
when specified, specifies that this function may return 'up_to' milliseconds
*before* the device is finished.
Raises:
Any other exception raised by the device in another thread (e.g., during `_fetch`).
TimeoutError: if the device has `duration = ∞`, and `busy` does not return `True` within
`self.timeout`
RuntimeError: if `wait` is called from inside a setter or from inside `_fetch`.
This would cause a deadlock.
"""
if self._error is not None:
e = self._error
self._error = None
raise e
# If duration = ∞, poll busy until it returns False or a timeout occurs.
if np.isfinite(self._end_time_ns):
start = time.time_ns()
timeout = self.timeout.to_value(u.ns)
while self.busy():
time.sleep(0.01)
if time.time_ns() - start > timeout:
raise TimeoutError("Timeout in %s (tid %i)", self, threading.get_ident())
else:
time_to_wait = self._end_time_ns - time.time_ns()
if up_to is not None:
time_to_wait -= up_to.to_value(u.ns)
if time_to_wait > 0:
time.sleep(time_to_wait / 1.0e9)
[docs]
def busy(self) -> bool:
"""Returns true if the device is measuring or moving (see `wait()`).
Note: if a device does not define a finite `duration`, it must override this function to poll for
finalization."""
return time.time_ns() < self._end_time_ns
@property
def timeout(self) -> Quantity[u.ms]:
"""Time after which a timeout error is raised when waiting for the device.
The timeout is automatically adjusted if the `duration` changes.
The default value is `duration + 5 s`."""
duration = self.duration
if np.isfinite(duration):
return self._timeout_margin + duration
else:
return self._timeout_margin
@timeout.setter
def timeout(self, value):
duration = self.duration
if not np.isinf(duration):
self._timeout_margin = value - duration
else:
self._timeout_margin = value.to(u.ms)
[docs]
class Actuator(Device, ABC):
"""Base class for all actuators"""
__slots__ = ()
@final
def _is_actuator(self):
return True
[docs]
class Detector(Device, ABC):
"""Base class for all detectors, cameras and other data sources with possible dynamic behavior.
See :numref:`Detectors` in the documentation for more information.
"""
__slots__ = (
"_measurements_pending",
"_lock_condition",
"_pixel_size",
"_data_shape",
)
def __init__(
self,
*,
data_shape: Optional[tuple[int, ...]],
pixel_size: Optional[Quantity],
duration: Optional[Quantity[u.ms]],
latency: Optional[Quantity[u.ms]],
multi_threaded: bool = True
):
"""
Constructor for the Detector class.
Args:
data_shape: The shape of the data array that `read()` will return. When None is passed,
the subclass should override the `data_shape` property to return the actual shape.
pixel_size: The pixel size (in astropy length units). None if the pixels do not have a size.
Subclassed can override the `pixel_size` property to return the actual pixel size.
duration: The maximum amount of time that elapses between returning from `trigger()`
and the end of the measurement. If the duration of an operation is not known in advance,
(e.g., when waiting for a hardware trigger), this value should be `np.inf * u.ms`
and the `busy` method should be overridden to return `False` when the measurement is finished.
If None is passed, the subclass should override the `duration` property to return the actual duration.
latency: The minimum amount of time between sending a command or trigger to the device
and the moment the device starts responding. If None is passed, the subclass should override
the `latency` property to return the actual latency.
multi_threaded: If True, `_fetch` is called from a worker thread. Otherwise, `_fetch` is called
directly from `trigger`. If the device is not thread-safe, or threading provides no benefit,
or for easy debugging, set this to False.
"""
super().__init__(duration=duration, latency=latency)
self._measurements_pending = 0
self._lock_condition = threading.Condition()
self._error = None
self._data_shape = data_shape
self._pixel_size = pixel_size
self._multi_threaded = multi_threaded
@final
def _is_actuator(self):
return False
def _increase_measurements_pending(self):
with self._lock_condition:
self._measurements_pending += 1
def _decrease_measurements_pending(self):
with self._lock_condition:
self._measurements_pending -= 1
if self._measurements_pending == 0:
self._lock_condition.notify_all()
[docs]
def wait(self, up_to: Quantity[u.ms] = None) -> None:
"""Waits until the hardware has (almost) finished measuring
Due to the automatic synchronization between detectors and actuators, this function only needs to be called
explicitly when waiting for data to be stored in the `out` argument of :meth:`~.Detector.trigger()`.
Args:
up_to: if specified, this function may return `up_to` milliseconds *before* the hardware
has finished measurements.
If None, this function waits until the hardware has finished all measurements *and* all data is fetched,
and stored in the `out` array if that was passed to trigger().
"""
super().wait(up_to)
if up_to is None:
# wait until all pending measurements are processed.
with self._lock_condition:
while self._measurements_pending > 0:
self._lock_condition.wait()
[docs]
def trigger(self, *args, out=None, immediate=False, **kwargs) -> Future:
"""Triggers the detector to start acquisition of the data.
This function does not wait for the measurement to complete.
Instead, it returns a ``concurrent.futures.Future``.
Call ``.result()`` on the returned object to wait for the data.
Here is a typical usage pattern:
.. code-block:: python
# Trigger the detector, which starts the data capture process
future = detector.trigger()
# Do some other work, perhaps trigger other detectors to capture
# data simultaneously...
# Now read the data from the detector. If the data is not ready yet,
# this will block until it is.
data = future.result()
An alternative method for asynchronous data capture is to use
the `out` parameter to specify a location where to store the data:
.. code-block:: python
out = np.zeros((2,), dtype='float32')
detector.trigger(out=out[0]) # start the first measurement
detector.trigger(out=out[1]) # queue the second measurement
detector.wait() # wait for both measurements to complete
# Now the data is stored in the `out` array.
All input parameters are passed to the _fetch function of the detector.
Child classes may override trigger() to call `super().trigger()` with additional parameters.
If any of these parameters is a Future, it is awaited before calling _fetch.
This way, data from multiple sources can be combined (see Processor).
Note:
To implement hardware triggering, do not override this function.
Instead, override `_do_trigger()` to ensure proper synchronization and locking.
Args:
out: If specified, the data is stored in this array once it is available.
immediate: If True, the data is fetched in the current thread. This is useful for debugging,
and for cases where the data is needed immediately. It avoids the overhead (and debugging complications)
of dispatching the call to _fetch to a worker thread.
*args: Additional arguments passed to the _fetch function.
If any of these arguments is a `concurrent.futures.Future`, the data is awaited before calling _fetch,
and the data is passed instead of the `Future`.
This is useful for combining data from multiple sources (see `Processor`).
**kwargs: Additional keyword arguments passed to the _fetch function. Any `concurrent.futures.Future`
in the keyword arguments is awaited before calling _fetch,
and the data is passed instead of the `Future`.
"""
self._increase_measurements_pending()
try:
self._start()
self._do_trigger()
except: # noqa - ok, we are not really catching the exception, just making sure the lock gets released
self._decrease_measurements_pending()
raise
logging.debug("triggering %s (tid: %i).", self, threading.get_ident())
if immediate or not self._multi_threaded:
result = Future()
result.set_result(self.__do_fetch(out, *args, **kwargs)) # noqa
return result
else:
return Device._workers.submit(self.__do_fetch, out, *args, **kwargs)
def __do_fetch(self, out_, *args_, **kwargs_):
"""Helper function that awaits all futures in the keyword argument list, and then calls _fetch"""
try:
if len(args_) > 0 or len(kwargs_) > 0:
logging.debug("awaiting inputs for %s (tid: %i).", self, threading.get_ident())
awaited_args = [(arg.result() if isinstance(arg, Future) else arg) for arg in args_]
awaited_kwargs = {key: (arg.result() if isinstance(arg, Future) else arg) for (key, arg) in kwargs_.items()}
logging.debug("fetching data of %s ((tid: %i)).", self, threading.get_ident())
data = self._fetch(*awaited_args, **awaited_kwargs)
data = set_pixel_size(data, self.pixel_size)
assert data.shape == self.data_shape
if out_ is not None:
out_[...] = data # store data in the location specified during trigger
return data
except Exception as e:
# if we are storing the result in an `out` array,
# the user may never call result() on the returned future object,
# and the error may be lost.
# Therefore, store it so that it can be raised on the next call to wait()
if out_ is not None:
self._error = e
raise e # raise the error again, it will be stored in the 'Future' object that was returned by trigger()
finally:
self._decrease_measurements_pending()
def __new__(cls, *args, **kwargs):
"""This method is called before __init__ to create a new instance of the class.
We need to override this to add attributes that will be used in __setattr__
"""
instance = super().__new__(cls)
instance._multi_threaded = False
return instance
def __setattr__(self, key, value):
"""Prevents modification of public attributes and properties while the device is locked.
For detectors, this prevents modification of the detector settings while a measurement is in progress.
For all devices, it prevents concurrent modification in a multi-threading context.
Private attributes can be set without locking.
Note that this is not thread-safe and should be done with care!!
"""
# note: the check needs to be in this order, otherwise we cannot initialize set _multi_threaded
if not key.startswith("_") and self._multi_threaded:
with self._lock_condition:
while self._measurements_pending > 0:
self._lock_condition.wait()
super().__setattr__(key, value)
else:
super().__setattr__(key, value)
def _do_trigger(self) -> None:
"""Override this function to perform the actual hardware trigger."""
pass
@abstractmethod
def _fetch(self, *args, **kwargs) -> np.ndarray:
"""Read the data from the detector
Args:
The args and kwargs are passed from the call to trigger()
Note:
After reading the data, the Detector attaches the pixel_size metadata.
Note:
Child classes must implement this function, and store the data in `out[...]` if `out` is not None.
"""
...
[docs]
@final
def read(self, *args, immediate=True, **kwargs) -> np.ndarray:
"""Triggers the detector and waits for the data to arrive.
Shortcut for trigger().result().
"""
return self.trigger(*args, immediate=immediate, **kwargs).result()
@property
def data_shape(self) -> Tuple[int, ...]:
"""The shape of the data array that `read()` will return.
For some detectors this property may be mutable, for example, for a camera it
represents the height and width of the ROI, which can be changed.
"""
return self._data_shape
@property
def pixel_size(self) -> Optional[Quantity]:
"""Physical dimension of one element in the returned data array.
For cameras, this is the pixel size (in astropy length units).
For detectors returning a time trace, this value is specified in astropy time units.
The pixel_size is a 1-D array with an element for each dimension of the returned data.
By default, the pixel size cannot be set.
However, in some cases (such as when the `pixel_size` is actually a sampling interval),
it makes sense for the child class to implement a setter.
"""
return self._pixel_size
[docs]
@final
def coordinates(self, dimension: int) -> Quantity:
"""Returns an array with the coordinate values along the d-th axis.
The coordinates represent the _centers_ of the grid points. For example,
for an array of shape ``(2,)`` the coordinates are `[0.5, 1.5] * pixel_size`
and not `[0, 1] * pixel_size`. If `self.pixel_size is None`, a pixel size
of 1.0 is used.
The coordinates are returned as an array with the same number of
dimensions as `data_shape`, with the d-th dimension holding the coordinates.
This facilitates meshgrid-like computations, e.g.
`cam.coordinates(0) + cam.coordinates(1)` gives a 2-dimensional array of coordinates.
Args:
dimension: Dimension for which to return the coordinates.
"""
unit = u.dimensionless_unscaled if self.pixel_size is None else self.pixel_size[dimension]
shape = np.ones_like(self.data_shape)
shape[dimension] = self.data_shape[dimension]
return np.arange(0.5, 0.5 + self.data_shape[dimension], 1.0).reshape(shape) * unit
@final
@property
def extent(self) -> Quantity:
"""Physical size of the data array
If a `pixel_size` is set, this function returns `data_shape * pixel_size`
as an `astropy.units.Quantity`.
If no `pixel_size` is set, this function uses the
`dimensionless_unscaled` unit.
"""
unit = u.dimensionless_unscaled if self.pixel_size is None else self.pixel_size
return np.array(self.data_shape) * unit
[docs]
class Processor(Detector, ABC):
"""Base class for all Processors.
Processors can be used to build data processing graphs, where each Processor takes input from one or
more input Detectors and processes that data (e.g., cropping an image, averaging over an ROI, etc.).
A processor, itself, is a Detector to allow chaining multiple processors together to combine functionality.
To implement a processor, implement `_fetch`, and optionally override `data_shape`, `pixel_size`, and `__init__`.
The `latency` and `duration` properties are computed from the latency and duration of the inputs and cannot be set.
By default, the `pixel_size` and `data_shape` are the same as the `pixel_size` and `data_shape` of the first input.
To override this behavior, override the `pixel_size` and `data_shape` properties.
Args:
multi_threaded: If True, `_fetch` is called from a worker thread. Otherwise, `_fetch` is called
directly from `trigger`. If the device is not thread-safe, or threading provides no benefit,
or for easy debugging, set this to False.
"""
def __init__(self, *args, multi_threaded: bool):
self._sources = args
# data_shape, duration, latency and pixel_size all may change dynamically
# when the settings of one of the source detectors is changed.
# Therefore, we pass 'None' for all parameters, and override
# data_shape, pixel_size, duration and latency in the properties.
super().__init__(
data_shape=None,
pixel_size=None,
duration=None,
latency=None,
multi_threaded=multi_threaded,
)
[docs]
def trigger(self, *args, immediate=False, **kwargs):
"""Triggers all sources at the same time (regardless of latency), and schedules a call to `_fetch()`"""
future_data = [
(source.trigger(immediate=immediate) if source is not None else None) for source in self._sources
]
return super().trigger(*future_data, *args, **kwargs)
@final
@property
def latency(self) -> Quantity[u.ms]:
"""Returns the shortest latency for all detectors."""
return min(
(source.latency for source in self._sources if source is not None),
default=0.0 * u.ms,
)
@final
@property
def duration(self) -> Quantity[u.ms]:
"""Returns the last end time minus the first start time for all detectors
i.e., max (duration + latency) - min(latency).
Note that `latency` is allowed to vary over time for devices that can only be triggered periodically,
so this `duration` may also vary over time.
"""
times = [(source.duration, source.latency) for source in self._sources if source is not None]
if len(times) == 0:
return 0.0 * u.ms
return max([duration + latency for (duration, latency) in times]) - min(
[latency for (duration, latency) in times]
)
@property
def data_shape(self):
"""This default implementation returns the data shape of the first source."""
return self._sources[0].data_shape
@property
def pixel_size(self) -> Optional[Quantity]:
"""This default implementation returns the pixel size of the first source."""
return self._sources[0].pixel_size
[docs]
class PhaseSLM(ABC):
"""Base class for phase-only SLMs"""
__slots__ = ()
[docs]
@abstractmethod
def update(self) -> None:
"""Sends the new phase pattern to be displayed on the SLM.
Implementations should call _start() before triggering the SLM.
Note:
This function *does not* wait for the image to appear on the SLM.
To wait for the image stabilization explicitly, use 'wait()'.
However, this should rarely be needed since all Detectors
already wait for the image to stabilize before starting a measurement.
"""
[docs]
@abstractmethod
def set_phases(self, values: ArrayLike, update: bool = True) -> None:
"""Sets the phase pattern on the SLM.
Args:
values(ArrayLike): phase pattern, in radians.
The pattern is automatically stretched to fill the full SLM.
update: when True, calls `update` after setting the phase pattern.
Set to `False` to suppress the call to `update`.
This is useful in advanced scenarios where multiple parameters of the SLM need to be changed
before updating the displayed image.
"""
...