Source code for hvl_ccb.dev.visa

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import logging
import re
from datetime import datetime, timedelta
from time import sleep
from typing import Optional, Union

from bitstring import BitArray

from hvl_ccb.comm.visa import VisaCommunication, VisaCommunicationConfig
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev import SingleCommDevice
from hvl_ccb.utils.poller import Poller
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


@configdataclass
class _VisaDeviceConfigBase:
    """
    Required VisaDeviceConfig keys, separated from the default ones to enable config
    extension by inheritance with required keys.
    """

    # NOTE: this class is unnecessary as there are no keys here; it's coded here only
    # to illustrate a solution; for detailed explanations of the issue see:
    # https://stackoverflow.com/questions/51575931/class-inheritance-in-python-3-7-dataclasses/
    pass


@configdataclass
class _VisaDeviceConfigDefaultsBase:
    spoll_interval: Number = 0.5
    """
    Seconds to wait between status polling.
    """

    spoll_start_delay: Number = 2
    """
    Seconds to delay the start of status polling.
    """

    def clean_values(self):
        if self.spoll_interval <= 0:
            raise ValueError("Polling interval needs to be positive.")

        if self.spoll_start_delay < 0:
            raise ValueError("Polling start delay needs to be non-negative.")


[docs] @configdataclass class VisaDeviceConfig(_VisaDeviceConfigDefaultsBase, _VisaDeviceConfigBase): """ Configdataclass for a VISA device. """ pass
[docs] class VisaDevice(SingleCommDevice): """ Device communicating over the VISA protocol using VisaCommunication. """ def __init__( self, com: Union[VisaCommunication, VisaCommunicationConfig, dict], dev_config: Union[VisaDeviceConfig, dict, None] = None, ) -> None: super().__init__(com, dev_config) self._spoll_thread: Union[Poller, None] = None self._notify_operation_complete: bool = False
[docs] @staticmethod def default_com_cls() -> type[VisaCommunication]: """ Return the default communication protocol for this device type, which is VisaCommunication. :return: the VisaCommunication class """ return VisaCommunication
[docs] @staticmethod def config_cls(): return VisaDeviceConfig
[docs] def get_identification(self) -> str: """ Queries `"*IDN?"` and returns the identification string of the connected device. :return: the identification string of the connected device """ return self.com.query("*IDN?")
[docs] def start(self) -> None: """ Start the VisaDevice. Sets up the status poller and starts it. :return: """ super().start() self._spoll_thread = Poller( polling_interval_sec=self.config.spoll_interval, polling_delay_sec=self.config.spoll_start_delay, spoll_handler=self.spoll_handler, ) self._spoll_thread.start_polling()
[docs] def stop(self) -> None: """ Stop the VisaDevice. Stops the polling thread and closes the communication protocol. :return: """ if self._spoll_thread: self._spoll_thread.stop_polling() super().stop()
[docs] def spoll_handler(self): """ Reads the status byte and decodes it. The status byte STB is defined in IEEE 488.2. It provides a rough overview of the instrument status. :return: """ stb = self.com.spoll() if stb: bits = BitArray(length=8, int=stb) bits.reverse() if bits[0]: # has no meaning, always zero pass if bits[1]: # has no meaning, always zero pass if bits[2]: # error queue contains new error logger.debug(f"Error bit set in STB: {stb}") self.get_error_queue() if bits[3]: # Questionable Status QUES summary bit logger.debug(f"Questionable status bit set in STB: {stb}") if bits[4]: # Output buffer holds data (RTO 1024), MAV bit (Message available) pass if bits[5]: # Event status byte ESB, summary of ESR register (RTO 1024) logger.debug(f"Operation status bit set in STB: {stb}") # read event status register esr = int(self.com.query("*ESR?")) esr_bits = BitArray(length=8, int=esr) esr_bits.reverse() if esr_bits[0]: # Operation complete bit set. This bit is set on receipt of the # command *OPC exactly when all previous commands have been # executed. logger.debug(f"Operation complete bit set in ESR: {esr}") self._notify_operation_complete = True if bits[6]: # RQS/MSS bit (RTO 1024) pass if bits[7]: # Operation Status OPER summary bit pass
[docs] def wait_operation_complete(self, timeout: Optional[float] = None) -> bool: """ Waits for a operation complete event. Returns after timeout [s] has expired or the operation complete event has been caught. :param timeout: Time in seconds to wait for the event; `None` for no timeout. :return: True, if OPC event is caught, False if timeout expired """ # reset event bit self._notify_operation_complete = False # compute timeout timeout_time = datetime.now() + timedelta(seconds=(timeout or 0)) # wait until event is caught while not self._notify_operation_complete: sleep(0.01) if timeout is not None and datetime.now() > timeout_time: break # if event was caught, return true if self._notify_operation_complete: self._notify_operation_complete = False return True # if timeout expired, return false return False
[docs] def get_error_queue(self) -> str: """ Read out error queue and logs the error. :return: Error string """ err_string = self.com.query("SYSTem:ERRor:ALL?") for error in re.findall("[^,]+,[^,]+", err_string): logger.error(f"VISA Error from Device: {error}") return err_string
[docs] def reset(self) -> None: """ Send `"*RST"` and `"*CLS"` to the device. Typically sets a defined state. """ self.com.write("*RST", "*CLS")