Source code for hvl_ccb.dev.keysightb298xx.comm

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import functools
import logging
from collections.abc import Callable

from bitstring import BitArray

from hvl_ccb.comm.visa import (
    VisaCommunication,
    VisaCommunicationConfig,
    VisaCommunicationError,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] class KeysightB2985AVisaCommunicationError(VisaCommunicationError): pass
[docs] @configdataclass class KeysightB2985AVisaCommunicationConfig(VisaCommunicationConfig): """ Configuration dataclass for VisaCommunication with specifications for the KeysightB2985A device class. """ interface_type: str | VisaCommunicationConfig.InterfaceType = ( VisaCommunicationConfig.InterfaceType.TCPIP_INSTR # type: ignore[assignment] )
[docs] def ready_to_read(func: Callable) -> Callable: @functools.wraps(func) def wrapper_decorator(self, *args, **kwargs): bits = BitArray(length=8, uint=self.spoll()) bits.reverse() if bits[4] and bits[5]: return func(self, *args, **kwargs) msg = "Device not ready to read" raise KeysightB2985AVisaCommunicationError(msg) return wrapper_decorator
[docs] def ready_to_write(func: Callable) -> Callable: @functools.wraps(func) def wrapper_decorator(self, *args, **kwargs): bits = BitArray(length=8, uint=self.spoll()) bits.reverse() if not bits[4] and bits[5]: return func(self, *args, **kwargs) if func.__name__ == "send_output_buffer": msg = "Device not ready to write" raise KeysightB2985AVisaCommunicationError(msg) return self._append_output_buffer(*args, **kwargs) return wrapper_decorator
[docs] def ready_to_query(func: Callable) -> Callable: @functools.wraps(func) def wrapper_decorator(self, *args, **kwargs): bits = BitArray(length=8, uint=self.spoll()) bits.reverse() if not bits[4] and bits[5]: return func(self, *args, **kwargs) msg = "Device not ready to query" raise KeysightB2985AVisaCommunicationError(msg) return wrapper_decorator
[docs] class KeysightB2985AVisaCommunication(VisaCommunication): """ Specialization of VisaCommunication for the KeysightB2985A Overrides read, write, query and its flavors to have proper communication procedure. Introduction of a output buffer to cache write operations when the device can not receive new commands. Can only be used with a Visa TCPIP-INSTR """ MULTI_COMMANDS_MAX = 15 def __init__(self, configuration) -> None: self._output_buffer: list[str] = [] super().__init__(configuration)
[docs] def open(self) -> None: super().open() super().write_multiple(f"*ESE {1}", "*OPC")
[docs] @staticmethod def config_cls(): return KeysightB2985AVisaCommunicationConfig
@property def output_buffer(self) -> list: return self._output_buffer
[docs] def reset(self) -> None: """ Reset the device communication """ super().write_multiple("*CLS", "*OPC")
[docs] @ready_to_write def write(self, text: str) -> None: """ Specialization of write method to satisfy the KeysightB2985A communication :param text: command string to write """ # Modify the write to always write the operation complete command. # to have a controlled communication. super().query("*ESR?") super().write_multiple(text, "*OPC")
[docs] @ready_to_write def write_multiple(self, *commands: str) -> None: super().query("*ESR?") super().write_multiple(*commands, "*OPC")
[docs] @ready_to_query def query( self, command: str, _n_attempts_max: int | None = None, _attempt_interval_sec: Number | None = None, ) -> str | None: return super().query_multiple("*ESR?", command, "*OPC")[1]
[docs] @ready_to_query def query_multiple( self, *commands: str, _n_attempts_max: int | None = None, _attempt_interval_sec: Number | None = None, ) -> str | tuple[str, ...]: return super().query_multiple("*ESR?", *commands, "*OPC")[1:]
[docs] @ready_to_read def read(self) -> str: return super().read()
def _append_output_buffer(self, *command: str) -> None: if len(self._output_buffer) > self.MULTI_COMMANDS_MAX - 2 - len(command): msg = "Output buffer full" raise KeysightB2985AVisaCommunicationError(msg) self._output_buffer += list(command) self._output_buffer.append("*WAI")
[docs] @ready_to_write def send_output_buffer(self) -> None: super().query("*ESR?") super().write_multiple(*tuple(self._output_buffer), "*OPC") self._output_buffer = []