Source code for hvl_ccb.dev.crylas.crylas

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device classes for a CryLas pulsed laser controller and a CryLas laser attenuator,
using serial communication.

There are three modes of operation for the laser
1. Laser-internal hardware trigger (default): fixed to 20 Hz and max energy per pulse.
2. Laser-internal software trigger (for diagnosis only).
3. External trigger: required for arbitrary pulse energy or repetition rate. Switch to
"external" on the front panel of laser controller for using option 3.

After switching on the laser with laser_on(), the system must stabilize
for some minutes. Do not apply abrupt changes of pulse energy or repetition rate.

Manufacturer homepage:
https://www.crylas.de/products/pulsed_laser.html

TODO: Fix typing
"""

import logging
import math
import re
import time
from collections.abc import Callable
from datetime import datetime
from threading import Event

from aenum import Enum, IntEnum

from hvl_ccb.comm.serial import (
    SerialCommunication,
    SerialCommunicationBytesize,
    SerialCommunicationConfig,
    SerialCommunicationParity,
    SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev.base import DeviceError, SingleCommDevice
from hvl_ccb.utils.poller import Poller
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] @configdataclass class CryLasLaserSerialCommunicationConfig(SerialCommunicationConfig): #: Baudrate for CryLas laser is 19200 baud baudrate: int = 19200 #: CryLas laser does not use parity parity: str | SerialCommunicationParity = SerialCommunicationParity.NONE #: CryLas laser uses one stop bit stopbits: int | SerialCommunicationStopbits = SerialCommunicationStopbits.ONE #: One byte is eight bits long bytesize: int | SerialCommunicationBytesize = SerialCommunicationBytesize.EIGHTBITS #: The terminator is LF terminator: bytes = b"\n" #: use 10 seconds timeout as default (a long timeout is needed!) timeout: Number = 10
[docs] class CryLasLaserSerialCommunication(SerialCommunication): """ Specific communication protocol implementation for the CryLas laser controller. Already predefines device-specific protocol parameters in config. """
[docs] @staticmethod def config_cls(): return CryLasLaserSerialCommunicationConfig
READ_TEXT_SKIP_PREFIXES = (">", "MODE:") """Prefixes of lines that are skipped when read from the serial port."""
[docs] def read( self, ) -> str: """ Read first line of text from the serial port that does not start with any of `self.READ_TEXT_SKIP_PREFIXES`. :return: String read from the serial port; `''` if there was nothing to read. :raises SerialCommunicationIOError: when communication port is not opened """ with self.access_lock: line = super().read() while line.startswith(self.READ_TEXT_SKIP_PREFIXES): logger.debug(f'Laser com: "{line.rstrip()}" SKIPPED') line = super().read_text() logger.debug(f'Laser com: "{line.rstrip()}"') return line
[docs] def query(self, cmd: str, prefix: str, post_cmd: str | None = None) -> str: """ Send a command, then read the com until a line starting with prefix, or an empty line, is found. Returns the line in question. :param cmd: query message to send to the device :param prefix: start of the line to look for in the device answer :param post_cmd: optional additional command to send after the query :return: line in question as a string :raises SerialCommunicationIOError: when communication port is not opened """ # TODO: Make use of SyncCommunicationProtocol # noqa: FIX002 # https://gitlab.com/ethz_hvl/hvl_ccb/-/issues/372 with self.access_lock: # send the command self.write(cmd) # read the com until the prefix or an empty line is found line = self.read().strip() while line and not line.startswith(prefix): line = self.read().strip() if post_cmd is not None: # send an optional post-command message to send to the device self.write(post_cmd) return line
[docs] def query_all(self, cmd: str, prefix: str) -> list: """ Send a command, then read the com until a line starting with prefix, or an empty line, is found. Returns a list of successive lines starting with prefix. :param cmd: query message to send to the device :param prefix: start of the line to look for in the device answer :return: line in question as a string :raises SerialCommunicationIOError: when communication port is not opened """ # TODO: Make use of SyncCommunicationProtocol # noqa: FIX002 # https://gitlab.com/ethz_hvl/hvl_ccb/-/issues/372 with self.access_lock: # send the command self.write(cmd) # read the com until the prefix or an empty line is found line = self.read().strip() while line and not line.startswith(prefix): line = self.read().strip() answer = [] while line.startswith(prefix): answer.append(line) line = self.read().strip() return answer
[docs] class CryLasLaserShutterStatus(Enum): """ Status of the CryLas laser shutter """ CLOSED = 0 OPENED = 1
[docs] @configdataclass class CryLasLaserConfig: """ Device configuration dataclass for the CryLas laser controller. """ # status of the shutter ShutterStatus = CryLasLaserShutterStatus # calibration factor for the pulse energy calibration_factor: Number = 4.35 # polling period (s) to check back on the laser status if not ready # it should be longer than the communication timeout polling_period: int | float = 12 # timeout (s) when polling the laser status, CryLasLaserError is raised on timeout polling_timeout: Number = 300 # automatically turn on the laser when ready auto_laser_on: bool = True # status of the shutter to be set on start init_shutter_status: int | CryLasLaserShutterStatus = ( CryLasLaserShutterStatus.CLOSED )
[docs] def clean_values(self) -> None: if self.calibration_factor <= 0: msg = "The calibration factor should be positive." raise ValueError(msg) if self.polling_period <= 0: msg = "The polling period should be positive." raise ValueError(msg) if self.polling_timeout <= 0: msg = "The polling timeout should be positive." raise ValueError(msg) self.force_value( # type: ignore[attr-defined] "init_shutter_status", self.ShutterStatus(self.init_shutter_status) )
[docs] class CryLasLaserPoller(Poller): """ Poller class for polling the laser status until the laser is ready. :raises CryLasLaserError: if the timeout is reached before the laser is ready :raises SerialCommunicationIOError: when communication port is closed. """ def __init__( # noqa: ANN204 self, spoll_handler: Callable, check_handler: Callable, check_laser_status_handler: Callable, polling_delay_sec: Number = 0, polling_interval_sec: Number = 1, polling_timeout_sec: Number | None = None, ): """ Initialize the polling helper. :param spoll_handler: Polling function. :param check_handler: Check polling results. :param check_laser_status_handler: Check laser status. :param polling_delay_sec: Delay before starting the polling, in seconds. :param polling_interval_sec: Polling interval, in seconds. """ super().__init__( spoll_handler, polling_delay_sec, polling_interval_sec, polling_timeout_sec ) self._check_handler = check_handler self._check_laser_status_handler = check_laser_status_handler def _if_poll_again( self, stop_event: Event, delay_sec: Number, stop_time: Number | None ) -> bool: """ Check if to poll again. :param stop_event: Polling stop event. :param delay_sec: Delay time (in seconds). :param stop_time: Absolute stop time. :return: `True` if another polling handler call is due, `False` otherwise. """ if_poll_again = super()._if_poll_again(stop_event, delay_sec, stop_time) is_laser_ready = self._check_laser_status_handler() return if_poll_again and not is_laser_ready def _poll_until_stop_or_timeout(self, stop_event: Event) -> object | None: """ Thread for polling until stopped, timed-out or the laser is ready. :param stop_event: Event used to stop the polling :return: Last result of the polling function call """ last_result = super()._poll_until_stop_or_timeout(stop_event) if stop_event.wait(0): logger.info(f"[{datetime.now()}] Polling status: ... STOPPED") return last_result self._check_handler() return last_result
[docs] class CryLasLaser(SingleCommDevice): """ CryLas laser controller device class. """
[docs] class LaserStatus(Enum): """ Status of the CryLas laser """ UNREADY_INACTIVE = 0 READY_INACTIVE = 1 READY_ACTIVE = 2 @property def is_ready(self) -> bool: return self is not CryLasLaser.LaserStatus.UNREADY_INACTIVE @property def is_inactive(self) -> bool: return self is not CryLasLaser.LaserStatus.READY_ACTIVE
# status of the shutter ShutterStatus = CryLasLaserShutterStatus
[docs] class AnswersShutter(Enum): """ Standard answers of the CryLas laser controller to `'Shutter'` command passed via `com`. """ OPENED = "Shutter aktiv" CLOSED = "Shutter inaktiv"
[docs] class AnswersStatus(Enum): """ Standard answers of the CryLas laser controller to `'STATUS'` command passed via `com`. """ TEC1 = "STATUS: TEC1 Regulation ok" TEC2 = "STATUS: TEC2 Regulation ok" HEAD = "STATUS: Head ok" READY = "STATUS: System ready" ACTIVE = "STATUS: Laser active" INACTIVE = "STATUS: Laser inactive"
[docs] class RepetitionRates(IntEnum, init="value send_value"): # type: ignore[call-arg] """ Repetition rates for the internal software trigger in Hz """ HARDWARE = 0, 0 # software trigger is disabled, hardware trigger is used # instead (hardware trigger may be internal or external). SOFTWARE_INTERNAL_TEN = 10, 1 SOFTWARE_INTERNAL_TWENTY = 20, 2 SOFTWARE_INTERNAL_SIXTY = 60, 3
def __init__(self, com, dev_config=None): # noqa: ANN204 # Call superclass constructor super().__init__(com, dev_config) # laser status self.laser_status = self.LaserStatus.UNREADY_INACTIVE # shutter status self.shutter_status = None # command repetition rate self.repetition_rate = None # command pulse energy (micro joule) self._target_pulse_energy = None # thread that polls the laser status until it is ready self._status_poller = None @property def target_pulse_energy(self) -> int | None: return self._target_pulse_energy
[docs] @staticmethod def default_com_cls(): return CryLasLaserSerialCommunication
[docs] @staticmethod def config_cls(): return CryLasLaserConfig
[docs] def start(self) -> None: """ Opens the communication protocol and configures the device. :raises SerialCommunicationIOError: when communication port cannot be opened """ logger.info(f"Starting device {self}") # open the com super().start() # set the init shutter status self.set_init_shutter_status() # check if the laser is ready to be turned on self.update_laser_status() if not self.laser_status.is_ready: logger.info("Laser not ready yet.") # optionally, block execution until laser is ready if self.config.auto_laser_on and not self._is_polling(): self._start_polling() elif self.config.auto_laser_on: # turn on the laser self.laser_on()
def _start_polling(self): """ Start polling laser status. """ logger.info("Start polling laser status.") self._status_poller = CryLasLaserPoller( spoll_handler=self.update_laser_status, check_handler=self._after_polling_check_status, check_laser_status_handler=self._check_laser_status_handler, polling_delay_sec=self.config.polling_period, polling_interval_sec=self.config.polling_period, polling_timeout_sec=self.config.polling_timeout, ) self._status_poller.start_polling() def _check_laser_status_handler(self) -> bool: """ Checks whether the laser status is ready. """ return self.laser_status.is_ready def _after_polling_check_status(self) -> None: """ Thread for polling the laser status until the laser is ready. :raises CryLasLaserError: if the timeout is reached before the laser is ready :raises SerialCommunicationIOError: when communication port is closed. """ if not self.laser_status.is_ready: err_msg = "Laser is not yet ready but status polling timed out." logger.error(err_msg) raise CryLasLaserError(err_msg) logger.info(f"[{datetime.now()}] Polling status: ... DONE") if self.config.auto_laser_on: self.laser_on() def _wait_for_polling_result(self): """ Wait for until polling function returns a result as well as any exception that might have been raised within a thread. :return: polling function result :raises: polling function errors """ return self._status_poller.wait_for_polling_result() def _stop_polling(self): """ Stop polling laser status. :return: polling function result :raises: polling function errors """ # use the event to stop the thread logger.info("Stop polling laser status.") self._status_poller.stop_polling() return self._wait_for_polling_result() def _is_polling(self) -> bool: """ Check if device status is being polled. :return: `True` when polling thread is set and alive """ return self._status_poller is not None and self._status_poller.is_polling()
[docs] def wait_until_ready(self) -> None: """ Block execution until the laser is ready :raises CryLasLaserError: if the polling thread stops before the laser is ready """ if not self.laser_status.is_ready: if not self._is_polling(): self._start_polling() logger.info("Waiting until the laser is ready...") self._wait_for_polling_result() else: logger.info("No need waiting, the laser is already ready.")
[docs] def stop(self) -> None: """ Stops the device and closes the communication protocol. :raises SerialCommunicationIOError: if com port is closed unexpectedly :raises CryLasLaserError: if laser_off() or close_shutter() fail """ if self.com.is_open: # turn off the laser self.laser_off() # close the laser shutter self.close_shutter() # cancel the polling thread in case still running if self._is_polling(): self._stop_polling() logger.info("The laser polling thread was stopped.") # close the com super().stop() else: logger.warning("Could not turn off the laser, com was already closed.")
[docs] def update_laser_status(self) -> None: """ Update the laser status to `LaserStatus.NOT_READY` or `LaserStatus.INACTIVE` or `LaserStatus.ACTIVE`. Note: laser never explicitly says that it is not ready ( `LaserStatus.NOT_READY`) in response to `'STATUS'` command. It only says that it is ready (heated-up and implicitly inactive/off) or active (on). If it's not either of these then the answer is `Answers.HEAD`. Moreover, the only time the laser explicitly says that its status is inactive ( `Answers.INACTIVE`) is after issuing a 'LASER OFF' command. :raises SerialCommunicationIOError: when communication port is not opened """ # query the status answer_list = self.com.query_all("STATUS", "STATUS:") # analyze the answer if not answer_list: err_msg = "Command to query the laser status did not return an answer." logger.error(err_msg) raise CryLasLaserError(err_msg) # at least one line in answer_list for line in answer_list: answer = self.AnswersStatus(line) if answer in ( self.AnswersStatus.TEC1, self.AnswersStatus.TEC2, self.AnswersStatus.HEAD, ): new_status = self.LaserStatus.UNREADY_INACTIVE info_msg = "The laser is not ready." elif answer is self.AnswersStatus.READY: new_status = self.LaserStatus.READY_INACTIVE info_msg = "The laser is ready." elif answer is self.AnswersStatus.ACTIVE: new_status = self.LaserStatus.READY_ACTIVE info_msg = "The laser is on." else: err_msg = f'Unexpected "STATUS" query response {answer.value}.' logger.error(err_msg) raise CryLasLaserError(err_msg) self.laser_status = new_status logger.info(info_msg)
[docs] def update_shutter_status(self) -> None: """ Update the shutter status (OPENED or CLOSED) :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # query the status answer = self.com.query("Shutter", "Shutter") # analyse the answer if not answer: err_msg = "Command to query the shutter status did not return an answer." logger.error(err_msg) raise CryLasLaserError(err_msg) answer = self.AnswersShutter(answer) if answer is self.AnswersShutter.CLOSED: logger.info("The laser shutter is currently CLOSED.") self.shutter_status = self.ShutterStatus.CLOSED elif answer is self.AnswersShutter.OPENED: logger.info("The laser shutter is currently OPENED.") self.shutter_status = self.ShutterStatus.OPENED
[docs] def update_repetition_rate(self) -> None: """ Query the laser repetition rate. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # query the repetition rate answer = self.com.query("BOO IP", "Impuls=") # analyse the answer if not answer: err_msg = "Querying the repetition rate did not return an answer." logger.error(err_msg) raise CryLasLaserError(err_msg) if answer == "Impuls=disabled, extern Trigger": logger.info( "The laser is using a hardware trigger (may be internal or external)." ) self.repetition_rate = self.RepetitionRates.HARDWARE elif answer.startswith("Impuls=enabled"): match = re.search(r"\d+", answer) if match is None: err_msg = ( "Expected rate integer to follow 'Impuls=enabled' answer " f"in {answer} while querying the repetition." ) logger.error(err_msg) raise CryLasLaserError(err_msg) rate = int(match.group(0)) self.repetition_rate = self.RepetitionRates(rate) logger.info( f"The laser is using the internal software trigger at {rate} Hz." )
[docs] def update_target_pulse_energy(self) -> None: """ Query the laser pulse energy. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # query the repetition rate answer = self.com.query("BOO SE", "PD-Sollwert=") # analyse the answer if not answer: err_msg = ( "Command to query the target pulse energy did not return an answer." ) logger.error(err_msg) raise CryLasLaserError(err_msg) current_value = int(answer.split("=")[1]) current_energy = int(current_value * self.config.calibration_factor / 1000) # update the pulse energy attribute self._target_pulse_energy = current_energy logger.info(f"The target pulse energy is {current_energy} uJ.")
[docs] def laser_on(self) -> None: """ Turn the laser on. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserNotReadyError: if the laser is not ready to be turned on :raises CryLasLaserError: if success is not confirmed by the device """ if not self.laser_status.is_ready: msg = "Laser not ready, cannot be turned on yet." raise CryLasLaserNotReadyError(msg) # send the command answer = self.com.query("LASER ON", "STATUS: Laser") # analyse the answer if not answer or self.AnswersStatus(answer) != self.AnswersStatus.ACTIVE: if not self.laser_status.is_inactive: logger.info("Laser is already on.") else: err_msg = ( "Command to turn on the laser " f"{'failed' if answer else 'did not return an answer'}." ) logger.error(err_msg) raise CryLasLaserError(err_msg) else: self.laser_status = self.LaserStatus.READY_ACTIVE logger.info("Laser is turned on.")
[docs] def laser_off(self) -> None: """ Turn the laser off. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # send the command answer = self.com.query("LASER OFF", "STATUS: Laser") # analyse the answer if not answer or self.AnswersStatus(answer) != self.AnswersStatus.INACTIVE: if self.laser_status.is_inactive: logger.info("Laser is already off.") else: err_msg = ( "Command to turn off the laser " f"{'failed' if answer else 'did not return an answer'}." ) logger.error(err_msg) raise CryLasLaserError(err_msg) else: self.laser_status = self.LaserStatus.READY_INACTIVE logger.info("Laser is turned off.")
[docs] def open_shutter(self) -> None: """ Open the laser shutter. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # send the command answer = self.com.query( f"Shutter {self.config.ShutterStatus.OPENED.value}", "Shutter" ) # analyse the answer if not answer or self.AnswersShutter(answer) != self.AnswersShutter.OPENED: err_msg = ( "Opening laser shutter " f"{'failed' if answer else 'did not return an answer'}." ) logger.error(err_msg) raise CryLasLaserError(err_msg) logger.info("Opening laser shutter succeeded.") self.shutter_status = self.ShutterStatus.OPENED
[docs] def close_shutter(self) -> None: """ Close the laser shutter. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # send the command answer = self.com.query( f"Shutter {self.config.ShutterStatus.CLOSED.value}", "Shutter" ) # analyse the answer if not answer or self.AnswersShutter(answer) != self.AnswersShutter.CLOSED: err_msg = ( "Closing laser shutter " f"{'failed' if answer else 'did not return an answer'}." ) logger.error(err_msg) raise CryLasLaserError(err_msg) logger.info("Closing laser shutter succeeded.") self.shutter_status = self.ShutterStatus.CLOSED
[docs] def set_init_shutter_status(self) -> None: """ Open or close the shutter, to match the configured shutter_status. :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ self.update_shutter_status() if self.config.init_shutter_status != self.shutter_status: if self.config.init_shutter_status == self.ShutterStatus.CLOSED: self.close_shutter() elif self.config.init_shutter_status == self.ShutterStatus.OPENED: self.open_shutter()
[docs] def get_pulse_energy_and_rate(self) -> tuple[int, int]: """ Use the debug mode, return the measured pulse energy and rate. :return: (energy in micro joule, rate in Hz) :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if the device does not answer the query """ # send the command (enter the debug mode) answer = self.com.query("DB1", "IST:", "DB0") if not answer: err_msg = "Command to find laser energy and rate did not return an answer." logger.error(err_msg) raise CryLasLaserError(err_msg) # the answer should look like 'IST: LDTemp=x, NLOTemp=x, CaseTemp=x, # PD=x, D_I= x, freq=x, LDE=x, RegOFF=x' where x are integers # find the values of interest values = re.findall(r"\d+", answer) energy = int(int(values[3]) * self.config.calibration_factor / 1000) rate = int(values[5]) logger.info(f"Laser energy: {energy} uJ, repetition rate: {rate} Hz.") return energy, rate
[docs] def set_repetition_rate(self, rate: int | RepetitionRates) -> None: """ Sets the repetition rate of the internal software trigger. :param rate: frequency (Hz) as an integer :raises ValueError: if rate is not an accepted value in RepetitionRates Enum :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if success is not confirmed by the device """ # check if the value is ok if not isinstance(rate, self.RepetitionRates): try: rate = self.RepetitionRates(rate) except ValueError as e: logger.exception("Error of Crylas: Wrong RepetitionRate", exc_info=e) raise # send the corresponding value to the controller answer = self.com.query(f"BOO IP{rate.send_value}", "Impuls=") # check the success of the command if rate != self.RepetitionRates.HARDWARE: # check whether the expected answer is obtained if answer != f"Impuls=enabled {rate}Hz": err_msg = ( "Setting the repetition rate failed. Controller answered:" f" {answer}." ) logger.error(err_msg) raise CryLasLaserError(err_msg) logger.info(f"Laser internal software trigger set to {rate.value} Hz") else: # For the internal hardware trigger rate, there is no way to check success logger.info("Using laser internal hardware trigger.") # update the repetition rate attribute self.repetition_rate = rate
[docs] def set_pulse_energy(self, energy: int) -> None: """ Sets the energy of pulses (works only with external hardware trigger). Proceed with small energy steps, or the regulation may fail. :param energy: energy in micro joule :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasLaserError: if the device does not confirm success """ # convert the input value into the required command value command_value = int(energy * 1000 / self.config.calibration_factor) # send the command answer = self.com.query(f"BOO SE {command_value}", "PD-Sollwert=") # analyse the answer if not answer: err_msg = "Command to set the pulse energy did not return an answer." logger.error(err_msg) raise CryLasLaserError(err_msg) current_value = int(answer.split("=")[1]) current_energy = int(current_value * self.config.calibration_factor / 1000) # update the pulse energy attribute self._target_pulse_energy = current_energy if current_value != command_value: message = ( "Command to set the pulse energy failed. " f"The pulse energy is currently set to {current_energy} " "micro joule. Setting a different value is only possible " "when using an external hardware trigger." ) logger.error(message) raise CryLasLaserError(message) logger.info(f"Successfully set pulse energy to {energy}")
[docs] class CryLasLaserError(DeviceError): """ General error with the CryLas Laser. """
[docs] class CryLasLaserNotReadyError(CryLasLaserError): """ Error when trying to turn on the CryLas Laser before it is ready. """
[docs] @configdataclass class CryLasAttenuatorSerialCommunicationConfig(SerialCommunicationConfig): #: Baudrate for CryLas attenuator is 9600 baud baudrate: int = 9600 #: CryLas attenuator does not use parity parity: str | SerialCommunicationParity = SerialCommunicationParity.NONE #: CryLas attenuator uses one stop bit stopbits: int | SerialCommunicationStopbits = SerialCommunicationStopbits.ONE #: One byte is eight bits long bytesize: int | SerialCommunicationBytesize = SerialCommunicationBytesize.EIGHTBITS #: No terminator terminator: bytes = b"" #: use 3 seconds timeout as default timeout: Number = 3
[docs] class CryLasAttenuatorSerialCommunication(SerialCommunication): """ Specific communication protocol implementation for the CryLas attenuator. Already predefines device-specific protocol parameters in config. """
[docs] @staticmethod def config_cls(): return CryLasAttenuatorSerialCommunicationConfig
[docs] @configdataclass class CryLasAttenuatorConfig: """ Device configuration dataclass for CryLas attenuator. """ # initial/default attenuation value which is set on start() init_attenuation: Number = 0 response_sleep_time: Number = 1
[docs] def clean_values(self) -> None: if not 0 <= self.init_attenuation <= 100: msg = "Attenuation should be between 0 and 100 included." raise ValueError(msg) if self.response_sleep_time <= 0: msg = "Response sleep time should be positive." raise ValueError(msg)
[docs] class CryLasAttenuator(SingleCommDevice): """ Device class for the CryLas laser attenuator. """ def __init__(self, com, dev_config=None): # noqa: ANN204 # Call superclass constructor super().__init__(com, dev_config) # attenuation of the laser light in percent (not determined yet) self._attenuation = None
[docs] @staticmethod def default_com_cls(): return CryLasAttenuatorSerialCommunication
[docs] @staticmethod def config_cls(): return CryLasAttenuatorConfig
@property def attenuation(self) -> Number: return self._attenuation @property def transmission(self) -> Number: return 100 - self._attenuation
[docs] def start(self) -> None: """ Open the com, apply the config value 'init_attenuation' :raises SerialCommunicationIOError: when communication port cannot be opened """ super().start() self.set_init_attenuation()
[docs] def set_init_attenuation(self) -> None: """ Sets the attenuation to its configured initial/default value :raises SerialCommunicationIOError: when communication port is not opened """ self.set_attenuation(self.config.init_attenuation)
[docs] def set_attenuation(self, percent: Number) -> None: """ Set the percentage of attenuated light (inverse of set_transmission). :param percent: percentage of attenuation, number between 0 and 100 :raises ValueError: if param percent not between 0 and 100 :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasAttenuatorError: if the device does not confirm success """ if not 0 <= percent <= 100: msg = "Attenuation should be between 0 and 100 included." raise ValueError(msg) pulse = int(math.asin((50 - percent) * 0.02) * 536.5 + 6000) prepulse = pulse - 400 # prepare the values to send with the motor protocol lsb = prepulse % 128 msb = math.floor(prepulse / 128) # send the values self.com.write_bytes(bytes([132, 0, lsb, msb])) time.sleep(self.config.response_sleep_time) lsb = pulse % 128 msb = math.floor(pulse / 128) self.com.write_bytes(bytes([132, 0, lsb, msb])) time.sleep(self.config.response_sleep_time / 10) self.com.write_bytes(bytes([161])) time.sleep(self.config.response_sleep_time / 10) b1 = self.com.read_single_bytes() b2 = self.com.read_single_bytes() logger.debug(f"b1 = {b1}, b2 = {b2}") if b1 != b"\x00" or b2 != b"\x00": err_msg = f"Setting laser attenuation to {percent} percents failed." logger.error(err_msg) raise CryLasAttenuatorError(err_msg) logger.info(f"Successfully set laser attenuation to {percent} Cpercents.") self._attenuation = percent
[docs] def set_transmission(self, percent: Number) -> None: """ Set the percentage of transmitted light (inverse of set_attenuation). :param percent: percentage of transmitted light :raises ValueError: if param percent not between 0 and 100 :raises SerialCommunicationIOError: when communication port is not opened :raises CryLasAttenuatorError: if the device does not confirm success """ self.set_attenuation(100 - percent)
[docs] class CryLasAttenuatorError(DeviceError): """ General error with the CryLas Attenuator. """