# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import functools
import logging
from collections.abc import Callable
from bitstring import BitArray
from hvl_ccb.comm.visa import (
VisaCommunication,
VisaCommunicationConfig,
VisaCommunicationError,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class KeysightB2985AVisaCommunicationError(VisaCommunicationError):
pass
[docs]
@configdataclass
class KeysightB2985AVisaCommunicationConfig(VisaCommunicationConfig):
"""
Configuration dataclass for VisaCommunication with specifications for the
KeysightB2985A device class.
"""
interface_type: str | VisaCommunicationConfig.InterfaceType = (
VisaCommunicationConfig.InterfaceType.TCPIP_INSTR # type: ignore[assignment]
)
[docs]
def ready_to_read(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_decorator(self, *args, **kwargs):
bits = BitArray(length=8, uint=self.spoll())
bits.reverse()
if bits[4] and bits[5]:
return func(self, *args, **kwargs)
msg = "Device not ready to read"
raise KeysightB2985AVisaCommunicationError(msg)
return wrapper_decorator
[docs]
def ready_to_write(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_decorator(self, *args, **kwargs):
bits = BitArray(length=8, uint=self.spoll())
bits.reverse()
if not bits[4] and bits[5]:
return func(self, *args, **kwargs)
if func.__name__ == "send_output_buffer":
msg = "Device not ready to write"
raise KeysightB2985AVisaCommunicationError(msg)
return self._append_output_buffer(*args, **kwargs)
return wrapper_decorator
[docs]
def ready_to_query(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_decorator(self, *args, **kwargs):
bits = BitArray(length=8, uint=self.spoll())
bits.reverse()
if not bits[4] and bits[5]:
return func(self, *args, **kwargs)
msg = "Device not ready to query"
raise KeysightB2985AVisaCommunicationError(msg)
return wrapper_decorator
[docs]
class KeysightB2985AVisaCommunication(VisaCommunication):
"""
Specialization of VisaCommunication for the KeysightB2985A
Overrides read, write, query and its flavors to have proper communication procedure.
Introduction of a output buffer to cache write operations when the device can not
receive new commands. Can only be used with a Visa TCPIP-INSTR
"""
MULTI_COMMANDS_MAX = 15
def __init__(self, configuration) -> None:
self._output_buffer: list[str] = []
super().__init__(configuration)
[docs]
def open(self) -> None:
super().open()
super().write_multiple(f"*ESE {1}", "*OPC")
[docs]
@staticmethod
def config_cls():
return KeysightB2985AVisaCommunicationConfig
@property
def output_buffer(self) -> list:
return self._output_buffer
[docs]
def reset(self) -> None:
"""
Reset the device communication
"""
super().write_multiple("*CLS", "*OPC")
[docs]
@ready_to_write
def write(self, text: str) -> None:
"""
Specialization of write method to satisfy the KeysightB2985A communication
:param text: command string to write
"""
# Modify the write to always write the operation complete command.
# to have a controlled communication.
super().query("*ESR?")
super().write_multiple(text, "*OPC")
[docs]
@ready_to_write
def write_multiple(self, *commands: str) -> None:
super().query("*ESR?")
super().write_multiple(*commands, "*OPC")
[docs]
@ready_to_query
def query(
self,
command: str,
_n_attempts_max: int | None = None,
_attempt_interval_sec: Number | None = None,
) -> str | None:
return super().query_multiple("*ESR?", command, "*OPC")[1]
[docs]
@ready_to_query
def query_multiple(
self,
*commands: str,
_n_attempts_max: int | None = None,
_attempt_interval_sec: Number | None = None,
) -> str | tuple[str, ...]:
return super().query_multiple("*ESR?", *commands, "*OPC")[1:]
[docs]
@ready_to_read
def read(self) -> str:
return super().read()
def _append_output_buffer(self, *command: str) -> None:
if len(self._output_buffer) > self.MULTI_COMMANDS_MAX - 2 - len(command):
msg = "Output buffer full"
raise KeysightB2985AVisaCommunicationError(msg)
self._output_buffer += list(command)
self._output_buffer.append("*WAI")
[docs]
@ready_to_write
def send_output_buffer(self) -> None:
super().query("*ESR?")
super().write_multiple(*tuple(self._output_buffer), "*OPC")
self._output_buffer = []