import warnings
from typing import Union, Optional, Sequence
from weakref import WeakSet
import astropy.units as u
import numpy as np
from astropy.units import Quantity
from numpy.typing import ArrayLike
from .context import Context
from .. import safe_import
from ...simulation import PhaseToField
GL = safe_import("OpenGL.GL", "opengl")
glfw = safe_import("glfw", "opengl")
from .patch import FrameBufferPatch, Patch, VertexArray
from ...core import PhaseSLM, Actuator, Device, Detector
from ...utilities import Transform
TimeType = Union[Quantity[u.ms], int]
[docs]
class SLM(Actuator, PhaseSLM):
"""
An OpenGL object to control a spatial light modulator connected to a graphics card.
See :numref:`section-slms` for more information.
"""
__slots__ = [
"_vertex_array",
"_frame_buffer",
"_monitor_id",
"_position",
"_refresh_rate",
"_transform",
"_shape",
"_window",
"_globals",
"_frame_buffer",
"patches",
"primary_patch",
"_coordinate_system",
"_pixel_reader",
"_phase_reader",
"_field_reader",
"_context",
"_clones",
]
_active_slms = WeakSet()
"""Keep track of all active SLMs. This is done for two reasons. First, to check if we are not putting two
full-screen SLMs on the same monitor. Second, to allow sharing the OpenGL context between all SLM windows,
so that we can use the same Patch and Texture objects on multiple SLMs simultaneously."""
WINDOWED = 0
patches: list[Patch]
def __init__(
self,
monitor_id: int = WINDOWED,
shape: Optional[tuple[int, int]] = None,
pos: tuple[int, int] = (0, 0),
refresh_rate: Optional[Quantity[u.Hz]] = None,
latency: TimeType = 2,
duration: TimeType = 1,
coordinate_system: str = "short",
transform: Optional[Transform] = None,
):
"""
Constructs a new SLM window.
Args:
monitor_id (int): Monitor id, see :py:attr:`~monitor_id`
shape (tuple[int,int]): Size of the window (height, width) or resolution of the fullscreen mode.
Default value is None (recommended for full screen windows), which will use the current resolution
of the monitor, or a standard size of 300x300 for windowed modes.
pos (tuple[int,int]): Windowed-mode only: (y,x)-coordinate of the top-left corner of the window.
Default value is (0,0).
refresh_rate (Quantity[u.Hz]): Refresh rate of the SLM.
Ignored for windowed SLMs.
When omitted, the current refresh rate of the monitor will be used.
Note that OpenGL does not seem to support non-integer refresh rates.
In these cases, it is better to set the refresh rate in the OS, and not
explicitly specify a refresh rate.
latency (int): Time between the vertical retrace and the start of the SLM response to the new frame,
specified in milliseconds (u.ms) or multiples of the frame period (unitless).
see :py:attr:`~latency`
duration (int): Time between the start of the SLM response to the newly presented frame, and the point
where the SLM has stabilized.
Specified in milliseconds (u.ms) or multiples of the frame period (unitless).
see :py:attr:`~duration`
transform (Transform): Global transformation matrix, see :py:attr:`~transform`.
The `transform` determines how these vertex coordinates that make up the shape of a Patch (see
:class:`Patch`) are mapped to the SLM window.
By default, 'short' is used (see :attr:`transform`)
Attributes:
patches (list[Patch]): List of patches that are drawn on the SLM.
"""
# construct window for displaying the SLM pattern
SLM._init_glfw()
self._assert_window_available(monitor_id)
self._monitor_id = monitor_id
self._position = pos
(default_shape, default_rate, _) = SLM._current_mode(monitor_id)
self._shape = default_shape if shape is None else shape
self._refresh_rate = default_rate if refresh_rate is None else refresh_rate.to_value(u.Hz)
self._frame_buffer = None
self._window = None
self._globals = -1
self.patches = []
self._context = None
self._create_window() # sets self._context, self._window and self._globals and self._frame_patch
self._coordinate_system = coordinate_system
self.transform = Transform() if transform is None else transform
self._vertex_array = VertexArray()
self._clones = WeakSet()
# Create a single patch for displaying phase.
# this default patch is square 1.0, and can be accessed through the 'primary_phase_patch' attribute
# In advanced scenarios, the geometry of this patch may be modified, or it may be replaced altogether.
self.patches.append(Patch(self._context))
self.primary_patch = self.patches[0]
SLM._active_slms.add(self)
if not isinstance(duration, Quantity):
duration = duration * self.period
if not isinstance(latency, Quantity):
latency = latency * self.period
super().__init__(duration=duration, latency=latency)
self.update()
def _assert_window_available(self, monitor_id) -> None:
"""
Checks if the target monitor is available for displaying an SLM window.
Raises:
Exception: If a full screen SLM is already present on the target monitor.
"""
if monitor_id == SLM.WINDOWED:
if any([slm.monitor_id == 1 for slm in SLM._active_slms if slm is not self]):
raise RuntimeError(
f"Cannot create an SLM window because a full-screen SLM is already active on monitor 1"
)
else:
# we cannot have multiple full screen windows on the same monitor. Also, we cannot have
# a full screen window on monitor 1 if there are already windowed SLMs.
if any(
[
slm.monitor_id == monitor_id or (monitor_id == 1 and slm.monitor_id == SLM.WINDOWED)
for slm in SLM._active_slms
if slm is not self
]
):
raise RuntimeError(
f"Cannot create a full-screen SLM window on monitor {monitor_id} because a "
f"window is already displayed on that monitor"
)
if monitor_id > len(glfw.get_monitors()):
raise IndexError(
f"Monitor {monitor_id} not found, only {len(glfw.get_monitors())} monitor(s) " f"are connected."
)
@staticmethod
def _current_mode(monitor_id: int):
"""Returns the current video mode resolution (height, width), refresh rate, and bit-depth of the specified
monitor.
For monitor_id == SLM.WINDOWED (windowed mode SLM), always returns the default window size
of (300, 300)
"""
if monitor_id == SLM.WINDOWED:
monitor = glfw.get_primary_monitor()
mode = glfw.get_video_mode(monitor)
shape = (300, 300)
else:
monitor = glfw.get_monitors()[monitor_id - 1]
mode = glfw.get_video_mode(monitor)
shape = (mode.size[1], mode.size[0])
return (
shape,
mode.refresh_rate,
min([mode.bits.red, mode.bits.green, mode.bits.blue]),
)
def _on_resize(self):
"""Updates shape and refresh rate to the actual values of the window.
Note that these values are in pixels, which may be different from the window size because the window size is
in screen coordinates, which may not always the same as pixels (e.g. on a retina display).
For windowed SLMs, the refresh rate property is set to the refresh rate of the primary monitor.
If the width, height or refresh rate differ from the requested values, or if the bit depth is less than 8,
a warning is issued.
This function also sets the viewport to the full window size and creates a frame buffer.
"""
(current_size, current_rate, current_bit_depth) = SLM._current_mode(self._monitor_id)
# verify that the bit depth is at least 8 bit
if current_bit_depth < 8:
warnings.warn(
f"Bit depth is less than 8 bits " f"You may not be able to use the full phase resolution of your SLM."
)
# verify the refresh rate is correct
# Then update the refresh rate to the actual value
if int(self._refresh_rate) != current_rate:
warnings.warn(
f"Actual refresh rate of {current_rate} Hz does not match set rate " f"of {self._refresh_rate} Hz"
)
self._refresh_rate = current_rate
# create a new frame buffer
# re-use the lookup table if possible, otherwise create a default one ranging from 0 to 2 ** bit_depth-1.
old_lut = self._frame_buffer.lookup_table if self._frame_buffer is not None else None
self._frame_buffer = FrameBufferPatch(self, old_lut, current_bit_depth)
GL.glViewport(0, 0, self._shape[1], self._shape[0])
# tell openGL to wait for the vertical retrace when swapping buffers (it appears need to do this
# after creating the frame buffer)
glfw.swap_interval(1)
# update the shape property to match the actual value of the window
(fb_width, fb_height) = glfw.get_framebuffer_size(self._window)
fb_shape = (fb_height, fb_width)
if self._shape != fb_shape:
warnings.warn(f"Actual resolution {fb_shape} does not match requested resolution {self._shape}.")
self._shape = fb_shape
@staticmethod
def _init_glfw():
"""Initializes the GLFW library and sets global configuration.
Note:
We never de-initialize the library. This should be fine because each slm window releases its resources
when it is destroyed. If we were to de-initialize the GLFW library (using glfw.terminate()) we run into
trouble if the user of our library also uses glfw for something else.
"""
glfw.init()
glfw.window_hint(glfw.OPENGL_PROFILE, glfw.OPENGL_CORE_PROFILE) # Required on Mac. Doesn't hurt on Windows
glfw.window_hint(glfw.OPENGL_FORWARD_COMPAT, glfw.TRUE) # Required on Mac. Useless on Windows
glfw.window_hint(glfw.CONTEXT_VERSION_MAJOR, 4) # request at least opengl 4.2
glfw.window_hint(glfw.CONTEXT_VERSION_MINOR, 2)
glfw.window_hint(glfw.FLOATING, glfw.TRUE) # Keep window on top
glfw.window_hint(glfw.DECORATED, glfw.FALSE) # Disable window border
glfw.window_hint(glfw.AUTO_ICONIFY, glfw.FALSE) # Prevent window minimization during task switch
glfw.window_hint(glfw.FOCUSED, glfw.FALSE)
glfw.window_hint(glfw.DOUBLEBUFFER, glfw.TRUE)
glfw.window_hint(glfw.RED_BITS, 8) # require at least 8 bits per color channel (256 gray values)
glfw.window_hint(glfw.GREEN_BITS, 8)
glfw.window_hint(glfw.BLUE_BITS, 8)
glfw.window_hint(glfw.COCOA_RETINA_FRAMEBUFFER, glfw.FALSE) # disable retina multisampling on Mac (untested)
glfw.window_hint(glfw.SAMPLES, 0) # disable multisampling
def _create_window(self):
"""Constructs a new window and associated OpenGL context. Called by SLM.__init__()"""
# Try to re-use an already existing OpenGL context, so that we can share textures and shaders between
# SLM objects.
self._context = Context(self)
other = next(iter(SLM._active_slms), None)
shared = other._window if other is not None else None # noqa: ok to use _window
SLM._active_slms.add(self)
monitor = glfw.get_monitors()[self._monitor_id - 1] if self._monitor_id != SLM.WINDOWED else None
glfw.window_hint(glfw.REFRESH_RATE, int(self._refresh_rate))
self._window = glfw.create_window(self._shape[1], self._shape[0], "OpenWFS SLM", monitor, shared)
glfw.set_input_mode(self._window, glfw.CURSOR, glfw.CURSOR_HIDDEN) # disable cursor
if monitor: # full screen mode
glfw.set_gamma(monitor, 1.0)
else: # windowed mode
glfw.set_window_pos(self._window, self._position[1], self._position[0])
with self._context:
self._globals = GL.glGenBuffers(1) # create buffer for storing globals
GL.glClearColor(0.0, 0.0, 0.0, 1.0) # set clear color to black
self._on_resize()
@property
def shape(self) -> tuple[int, int]:
"""Shape (height × width) of the window in pixels.
Limitations :
- For windowed-mode SLMs, the size cannot be modified.
- When moving the SLM to a different monitor (see `monitor_id`), the SLM is sized to match the current
resolution on that monitor. Note that this value may differ from the value passed as input, because the
input value is specified in screen coordinates, whereas the reported width is in pixels.
In this case, the original value of shape will be lost.
- The `transform` property is not updated automatically, so if the aspect ratio changes
the transform needs to be set again.
"""
return self._shape
@shape.setter
def shape(self, value: tuple[int, int]):
if self.monitor_id == SLM.WINDOWED and self._shape != value:
glfw.set_window_size(self._window, value[1], value[0])
self._shape = value
self._on_resize()
@property
def position(self) -> tuple[int, int]:
"""
The position of the top-left corner of the SLM window as (y, x) screen coordinates.
Note:
This property is ignored for full-screen SLMs.
"""
return self._position
@position.setter
def position(self, value: tuple[int, int]):
if self.monitor_id == SLM.WINDOWED and self._position != value:
glfw.set_window_pos(self._window, value[1], value[0])
self._position = value
@property
def refresh_rate(self) -> Quantity[u.Hz]:
"""
Refresh rate of the SLM in Hz (read only).
Note:
The refresh rate cannot be modified after the SLM is created. When moving
the SLM to a different monitor (see `monitor_id`), the refresh rate is changed to the current video
mode on that monitor. Note that OpenGL specifies this value as an integer, whereas some SLMs support
non-integer refresh rates. It is always best to not specify the refresh rate and set the video mode
in the operating system before creating the SLM object.
"""
return self._refresh_rate * u.Hz
@property
def period(self) -> Quantity[u.ms]:
"""The period of the refresh rate in milliseconds (read only)."""
return (1000 / self._refresh_rate) * u.ms
@property
def monitor_id(self) -> int:
"""
Number of the monitor (1 for primary screen, 2 for secondary screen, etc.) for the SLM.
Each monitor can only hold a single full-screen window.
Use monitor_id=SLM.WINDOWED to show a windowed SLM on the primary screen
(for debugging and monitoring purposes).
There can be multiple windowed SLMs on the
primary screen, but there cannot also be a fullscreen SLM on the primary screen at the same time.
`monitor_id` can be modified at run time, in which case the current SLM window is replaced by a new window
on a different monitor. When moving the SLM to a different window, width, height and refresh_rate are set
"""
return self._monitor_id
@monitor_id.setter
def monitor_id(self, value):
if value == self._monitor_id:
return
self._assert_window_available(value)
self._monitor_id = value
(self._shape, self._refresh_rate, _) = SLM._current_mode(value)
monitor = glfw.get_monitors()[value - 1] if value != SLM.WINDOWED else None
# move window to new monitor
glfw.set_window_monitor(
self._window,
monitor,
self._position[1],
self._position[0],
self._shape[1],
self._shape[0],
int(self._refresh_rate),
)
self._on_resize()
def __del__(self):
"""Destructor for the SLM object. This function destroys the window and releases all resources."""
glfw.destroy_window(self._window)
[docs]
def update(self):
"""Sends the new phase pattern to be displayed on the SLM.
This function waits for the vsync, and returns directly after it.
Therefore, it can be used as software synchronization to the SLM.
Note:
This function *does not* wait for the image to appear on the SLM.
To wait for the image stabilization explicitly, use 'wait()'.
However, this should rarely be needed since all Detectors
already call wait_finished before starting a measurement.
Note:
At the moment, :meth:`~.SLM.update` blocks until all OpenGL commands are processed,
and a vertical retrace occurs (i.e., the hardware signals the start of a new frame).
This behavior may change in the future and should not be relied on.
Instead, use the automatic synchronization mechanism to synchronize detectors with
the SLM hardware.
"""
with self._context:
# first draw all patches into the frame buffer
GL.glBindFramebuffer(
GL.GL_FRAMEBUFFER, self._frame_buffer._frame_buffer
) # noqa - ok to access 'friend class'
GL.glClear(GL.GL_COLOR_BUFFER_BIT)
for patch in self.patches:
patch._draw() # noqa - ok to access 'friend class'
# then draw the frame buffer to the screen
GL.glBindFramebuffer(GL.GL_FRAMEBUFFER, 0)
self._frame_buffer._draw() # noqa - ok to access 'friend class'
glfw.poll_events() # process window messages
if len(self._clones) > 0:
self._context.__exit__(None, None, None) # release context before updating clones
for clone in self._clones:
with clone.slm._context: # noqa
self._frame_buffer._draw() # noqa - ok to access 'friend class'
glfw.swap_buffers(clone.slm._window) # noqa
self._context.__enter__() # re-enter context
# start 'moving' phase, then display the newly rendered image
self._start()
glfw.swap_buffers(self._window)
# wait for buffer swap to complete (this should be directly after a vsync, so returning from this
# function _should_ be synced with the vsync)
GL.glFinish()
# call _start again to update the _end_time_ns property,
# since some time has passed waiting for the vsync
self._start()
@property
def latency(self) -> Quantity[u.ms]:
"""Latency (a.k.a. 'idle time')
Represents the time delay between the vertical retrace
and the start of the SLM response to then new frame.
"""
return self._latency
@Device.latency.setter
def latency(self, value: Quantity[u.ms]):
self._latency = value.to(u.ms)
@Device.duration.setter
def duration(self, value: Quantity[u.ms]):
self._duration = value.to(u.ms)
@property
def coordinate_system(self) -> str:
"""Specifies the base coordinate system that is used to map vertex coordinates to the SLM window.
Possible values are 'full', 'short' and 'long'.
'full' means that the coordinate range (-1,-1) to (1,1) is mapped to the entire SLM window.
If the window is not square, this means that the coordinates are anisotropic.
'short' and 'long' map the coordinate range (-1,-1) to (1,1) to a square.
'short' means that the square is scaled to fill the short side of the SLM (introducing zero-padding at the
edges).
'long' means that the square is scaled to fill the long side of the SLM
(causing part of the coordinate range to be cropped because these coordinates correspond to points outside
the SLM window).
For a square SLM, 'full', 'short' and 'long' are all equivalent.
In all three cases, (-1,-1) corresponds to the top-left corner of the screen, and (1,-1) to the
bottom-left corner. This convention is consistent with that used in numpy/matplotlib
To further modify the mapping system, use the `transform` property.
"""
return self._coordinate_system
@coordinate_system.setter
def coordinate_system(self, value: str):
if value not in ["full", "short", "long"]:
raise ValueError(f"Unsupported coordinate system {value}")
self._coordinate_system = value
self.transform = self._transform # trigger update of transform matrix on gpu
@property
def transform(self) -> Transform:
"""Global transformation matrix
The transform determines how the vertex coordinates that make up the shape of a Patch (see
:class:`Patch`) are mapped to the standard coordinate system.
In turn, the `coordinate_system` property determines how this coordinate system is mapped to the SLM window.
By default, this value is just the identity transformation `Transform()`.
"""
return self._transform
@transform.setter
def transform(self, value: Transform):
# first compute the basic coordinate transform
width = self._shape[1]
height = self._shape[0]
if not isinstance(value, Transform):
raise ValueError("Transform must be a Transform object")
self._transform = value
self._pixel_reader = None
self._phase_reader = None
self._field_reader = None
# update matrix stored on the gpu
if self._coordinate_system == "full":
transform = self._transform
else:
scale_width = (width > height) == (self._coordinate_system == "short")
if scale_width:
root_transform = Transform(np.array(((1.0, 0.0), (0.0, height / width))))
else:
root_transform = Transform(np.array(((width / height, 0.0), (0.0, 1.0))))
transform = self._transform @ root_transform
padded = transform.opencl_matrix()
with self._context:
GL.glBindBuffer(GL.GL_UNIFORM_BUFFER, self._globals)
GL.glBufferData(GL.GL_UNIFORM_BUFFER, padded.size * 4, padded, GL.GL_STATIC_DRAW)
GL.glBindBufferBase(GL.GL_UNIFORM_BUFFER, 1, self._globals) # connect buffer to binding point 1
@property
def lookup_table(self) -> Sequence[int]:
"""Lookup table that is used to map the wrapped phase range of 0-2pi to gray values
The gray values are represented in the range from 0 to 2**bit_depth - 1). For an 8-bit video mode, this is 0-255.
By default, a linear lookup table is set: range(2**bit_depth - 1).
Note: lookup table need not contain 2**bit_depth elements.
A typical scenario is to use something like `slm.lookup_table=range(142)` to map the 0-2pi range
to only the first 142 gray values.
"""
return self._frame_buffer.lookup_table
@lookup_table.setter
def lookup_table(self, value: Sequence[int]):
self._frame_buffer.lookup_table = value[:]
[docs]
def set_phases(self, values: ArrayLike, update=True):
self.primary_patch.set_phases(values, update)
@property
def pixels(self) -> Detector:
"""Returns a 'camera' to monitor the current value of the pixels displayed on the SLM."""
if self._pixel_reader is None:
self._pixel_reader = FrontBufferReader(self)
return self._pixel_reader
@property
def field(self) -> Detector:
"""Returns a 'camera' to monitor the current field coming from the SLM.
Returns:
a detector that returns `self.amplitude * exp(1.0j * self.phases.read()`
"""
if self._field_reader is None:
self._field_reader = PhaseToField(self.phases)
return self._field_reader
@property
def phases(self) -> Detector:
"""Returns an object to monitor the phase pattern last sent to the SLM.
This is the pattern before applying the lookup table.
"""
if self._phase_reader is None:
self._phase_reader = FrameBufferReader(self)
return self._phase_reader
[docs]
def clone(
self,
monitor_id: int = WINDOWED,
shape: Optional[tuple[int, int]] = None,
pos: tuple[int, int] = (0, 0),
):
"""Creates a new SLM window that mirrors the content of this SLM window.
This is useful for demonstration and debugging purposes.
The image in the clone window is updated automatically when the SLM
is updated.
Args:
monitor_id: ID of the monitor to display the window on. Defaults to WINDOWED (0) mode
shape: shape (height, width) of the window.
pos: position (y, x) of the window.
Returns:
Returns an object that should be stored in a variable to keep the SLM window open.
When the variable is cleared or leaves the current scope, the SLM window is closed.
See `slm_demo.py` for an example.
"""
clone = _Clone(slm=SLM(monitor_id=monitor_id, shape=shape, pos=pos))
self._clones.add(clone)
return clone
class _Clone:
slm: SLM
def __init__(self, slm: SLM):
self.slm = slm
class FrontBufferReader(Detector):
def __init__(self, slm):
self._context = Context(slm)
super().__init__(
data_shape=None,
pixel_size=None,
duration=0.0 * u.ms,
latency=0.0 * u.ms,
multi_threaded=False,
)
@property
def data_shape(self):
return self._context.slm.shape
def _fetch(self, *args, **kwargs) -> np.ndarray:
with self._context:
GL.glReadBuffer(GL.GL_FRONT)
shape = self.data_shape
data = np.empty(shape, dtype="uint8")
GL.glReadPixels(0, 0, shape[1], shape[0], GL.GL_RED, GL.GL_UNSIGNED_BYTE, data)
# flip data upside down, because the OpenGL convention is to have the origin at the bottom left,
# but we want it at the top left (like in numpy)
return data[::-1, :]
class FrameBufferReader(Detector):
def __init__(self, slm):
self._context = Context(slm)
super().__init__(
data_shape=None,
pixel_size=None,
duration=0.0 * u.ms,
latency=0.0 * u.ms,
multi_threaded=False,
)
@property
def data_shape(self):
return self._context.slm.shape
def _fetch(self, *args, **kwargs) -> np.ndarray:
with self._context as slm:
return slm._frame_buffer.get_pixels() # noqa - ok to access 'friend class'