Source code for hvl_ccb.dev.keysightb298xx.device

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import logging
from datetime import datetime, timedelta
from time import sleep

from hvl_ccb.dev.visa import VisaDevice

from .base import KeysightB2985AConfig, KeysightB2985AError
from .comm import (
    KeysightB2985AVisaCommunication,
    KeysightB2985AVisaCommunicationConfig,
)
from .modules import (
    Acquisition,
    Data,
    Input,
    Output,
    Status,
    Transition,
)

logger = logging.getLogger(__name__)


[docs] class KeysightB2985A(VisaDevice): """ Device class for the KeysightB2985A. """
[docs] @staticmethod def config_cls(): return KeysightB2985AConfig
[docs] @staticmethod def default_com_cls(): return KeysightB2985AVisaCommunication
def __init__( self, com: ( KeysightB2985AVisaCommunication | KeysightB2985AVisaCommunicationConfig | dict ), dev_config: KeysightB2985AConfig | dict | None = None, ) -> None: super().__init__(com, dev_config) del self._notify_operation_complete # load the individual device modules. self._status: Status = Status(self.com) # type: ignore[assignment] self._acquire = Acquisition(self.com) self._data = Data(self.com) self._input = Input(self.com, self.config.module(self.com)) self._output = Output(self.com) self._transition = Transition(self.com)
[docs] def start(self) -> None: """ Start the KeysightB2985A and bring it into a defined state and remote mode. """ super().start() # reset device (RST) and clear status registers (CLS) self.com.write_multiple("*RST", "*CLS") # enable local display self._local_display_state(True)
[docs] def stop(self) -> None: """ Stop the KeysightB2985A and close communication. Resets the device to local operation mode. """ if self._spoll_thread is not None and self._spoll_thread.is_polling(): # clear status registers (CLS) self.com.write_multiple("*RST", "*CLS") else: logger.warning("KeysightB2985A was already stopped") super().stop()
@property def status(self) -> Status: # type: ignore[override] """ The status byte STB is defined in IEEE 488.2. It provides a rough overview of the instrument status. """ return self._status @property def acquire(self) -> Acquisition: return self._acquire @property def data(self) -> Data: return self._data @property def input(self) -> Input: return self._input @property def output(self) -> Output: return self._output @property def transition(self) -> Transition: return self._transition def _local_display_state(self, state: bool) -> None: """ Enable or disable local display. :param state: is the desired local display state """ self.com.write(f":DISP:ENAB {int(state)}")
[docs] def self_test(self) -> bool: """ Perform a device self-test to check for correct operation. :return: bool with result of the self test. """ logger.info("Running self-diagnostic.") self.com.write("*TST?") sleep(1) self.wait_operation_complete(20) result = not bool(int(self.com.read())) if not result: logger.warning("Self-test: FAIL") return result logger.info("Self-test: PASS") return result
[docs] def query_status(self) -> None: """ Queries the status byte and reads the error queue """ with self.com.access_lock: # Access lock: Block communication for second query of error queue self.status.status_summary.update_status() if self.status.status_summary.error_queue.condition: # error queue contains new error logger.debug("Error in error queue") self.com.write_bytes(b"*OPC") self.status.status_summary.update_status() if self.status.status_summary.standard_event_status.condition: msg = self.get_error_queue() raise KeysightB2985AError(msg) if ( self.status.status_summary.output_buffer.condition or not self.status.status_summary.standard_event_status.condition ): return if self.com.output_buffer: self.com.send_output_buffer()
[docs] def wait_operation_complete(self, timeout: float | None = None) -> bool: """ Wait until a task is completed. :param timeout: timeout in seconds, defaults to 0.1 s :raises TimeoutError: if the operation does not complete in time :return: operation complete or timeout """ timeout_time = datetime.now() + timedelta( seconds=(timeout if timeout is not None else 0) ) # wait until event is caught while not self.status.status_summary.standard_event_status.condition: sleep(0.1) if timeout is None or datetime.now() > timeout_time: msg = "Timeout waiting for operation complete" raise TimeoutError(msg) return self.status.status_summary.standard_event_status.condition