Source code for hvl_ccb.dev.tiepie.base

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""

"""

import logging
import time
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, Union

import libtiepie as ltp

# from libtiepie import i2chost as ltp_i2c
from libtiepie import generator as ltp_gen
from libtiepie import oscilloscope as ltp_osc
from libtiepie.exceptions import InvalidDeviceSerialNumberError, LibTiePieException

from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev import DeviceError
from hvl_ccb.utils.enum import NameEnum
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] @configdataclass class TiePieDeviceConfig: """ Configuration dataclass for TiePie """ serial_number: int require_block_measurement_support: bool = True n_max_try_get_device: int = 10 wait_sec_retry_get_device: Number = 1.0 is_data_ready_polling_interval_sec: Number = 0.01
[docs] def clean_values(self): if self.serial_number <= 0: raise ValueError("serial_number must be a positive integer.") if self.n_max_try_get_device <= 0: raise ValueError("n_max_try_get_device must be an positive integer.") if self.wait_sec_retry_get_device <= 0: raise ValueError("wait_sec_retry_get_device must be a positive number.") if self.is_data_ready_polling_interval_sec <= 0: raise ValueError( "is_data_ready_polling_interval_sec must be a positive number." )
[docs] class TiePieDeviceType(NameEnum, init="value ltp_class"): # type:ignore """ TiePie device type. """ OSCILLOSCOPE = ltp.DEVICETYPE_OSCILLOSCOPE, ltp_osc.Oscilloscope # I2C = ltp.DEVICETYPE_I2CHOST, ltp_i2c.I2CHost GENERATOR = ltp.DEVICETYPE_GENERATOR, ltp_gen.Generator
[docs] class TiePieError(DeviceError): """ Error of the class TiePie """ pass
[docs] def wrap_libtiepie_exception(func: Callable) -> Callable: """ Decorator wrapper for `libtiepie` methods that use `libtiepie.library.check_last_status_raise_on_error()` calls. :param func: Function or method to be wrapped :raises TiePieError: instead of `LibTiePieException` or one of its subtypes. :return: whatever `func` returns """ @wraps(func) def wrapped_func(*args, **kwargs): try: return func(*args, **kwargs) except LibTiePieException as e: logger.error(str(e), exc_info=e) raise TiePieError from e return wrapped_func
_LtpDeviceReturnType = TypeVar("_LtpDeviceReturnType") """ An auxiliary typing hint of a `libtiepie` device type for return value of the `get_device_by_serial_number` function and the wrapper methods using it. """
[docs] @wrap_libtiepie_exception def get_device_by_serial_number( serial_number: int, # Note: TiePieDeviceType aenum as a tuple to define a return value type device_type: Union[str, tuple[int, _LtpDeviceReturnType]], n_max_try_get_device: int = 10, wait_sec_retry_get_device: float = 1.0, ) -> _LtpDeviceReturnType: """ Open and return handle of TiePie device with a given serial number :param serial_number: int serial number of the device :param device_type: a `TiePieDeviceType` instance containing device identifier (int number) and its corresponding class, both from `libtiepie`, or a string name of such instance :param n_max_try_get_device: maximal number of device list updates (int number) :param wait_sec_retry_get_device: waiting time in seconds between retries (int number) :return: Instance of a `libtiepie` device class according to the specified `device_type` :raises TiePieError: when there is no device with given serial number :raises ValueError: when `device_type` is not an instance of `TiePieDeviceType` """ device_type = TiePieDeviceType(device_type) # include network search with ltp.device_list.update() ltp.network.auto_detect_enabled = True n_try = 0 device_list_item: Optional[ltp.devicelistitem.DeviceListItem] = None while device_list_item is None and n_try < n_max_try_get_device: n_try += 1 ltp.device_list.update() if not ltp.device_list: msg = f"Searching for device... (attempt #{n_try}/{n_max_try_get_device})" if n_try < n_max_try_get_device: logger.warning(msg) time.sleep(wait_sec_retry_get_device) continue msg = f"No devices found to start (attempt #{n_try}/{n_max_try_get_device})" logger.error(msg) raise TiePieError(msg) # if a device is found try: device_list_item = ltp.device_list.get_item_by_serial_number(serial_number) except InvalidDeviceSerialNumberError as e: msg = ( f"The device with serial number {serial_number} is not " f"available; attempt #{n_try}/{n_max_try_get_device}." ) if n_try < n_max_try_get_device: logger.warning(msg) time.sleep(wait_sec_retry_get_device) continue logger.error(str(e), exc_info=e) raise TiePieError from e assert device_list_item is not None if not device_list_item.can_open(device_type.value): msg = ( f"The device with serial number {serial_number} has no " f"{device_type} available." ) logger.error(msg) raise TiePieError(msg) return device_list_item.open_device(device_type.value)
def _verify_via_libtiepie( dev_obj: ltp.device.Device, verify_method_suffix: str, value: Number ) -> Number: """ Generic wrapper for `verify_SOMETHING` methods of the `libtiepie` device. Additionally to returning a value that will be actually set, gives an warning. :param dev_obj: TiePie device object, which has the verify_SOMETHING method :param verify_method_suffix: `libtiepie` devices verify_SOMETHING method :param value: numeric value :returns: Value that will be actually set instead of `value`. :raises TiePieError: when status of underlying device gives an error """ verify_method = getattr( dev_obj, f"verify_{verify_method_suffix}", ) will_have_value = verify_method(value) if will_have_value != value: if verify_method_suffix == "record_length": value_str = f"{value:_d}" set_value = f"{will_have_value:_d}" else: value_str = f"{value:_.3f}" set_value = f"{will_have_value:_.3f}" msg = ( f"Can't set {verify_method_suffix} to " f"{value_str}; instead {set_value} will be set." ) logger.warning(msg) return will_have_value def _require_dev_handle(device_type) -> Callable[[Callable], Callable]: """ Create method decorator to check if the TiePie device handle is available. :param device_type: the TiePie device type which device handle is required :raises ValueError: when `device_type` is not an instance of `TiePieDeviceType` """ device_type: TiePieDeviceType = TiePieDeviceType( # type: ignore[no-redef] device_type ) def wrapper(method) -> Callable: """ Method decorator to check if a TiePie device handle is available; raises `TiePieError` if hand is not available. :param method: `TiePieDevice` instance method to wrap :return: Whatever wrapped `method` returns """ @wraps(method) def wrapped_func(self, *args, **kwargs) -> Any: dev_str = None if device_type is TiePieDeviceType.OSCILLOSCOPE and self._osc is None: dev_str = "oscilloscope" if device_type is TiePieDeviceType.GENERATOR and self._gen is None: dev_str = "generator" # if device_type is TiePieDeviceType.I2C and self._i2c is None: # dev_str = "I2C host" if dev_str is not None: msg = f"The {dev_str} handle is not available; call `.start()` first." logger.error(msg) raise TiePieError(msg) return method(self, *args, **kwargs) return wrapped_func return wrapper