# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import logging
from datetime import datetime, timedelta
from time import sleep
from hvl_ccb.dev.visa import VisaDevice
from .base import KeysightB2985AConfig, KeysightB2985AError
from .comm import (
KeysightB2985AVisaCommunication,
KeysightB2985AVisaCommunicationConfig,
)
from .modules import (
Acquisition,
Data,
Input,
Output,
Status,
Transition,
)
logger = logging.getLogger(__name__)
[docs]
class KeysightB2985A(VisaDevice):
"""
Device class for the KeysightB2985A.
"""
[docs]
@staticmethod
def config_cls():
return KeysightB2985AConfig
[docs]
@staticmethod
def default_com_cls():
return KeysightB2985AVisaCommunication
def __init__(
self,
com: (
KeysightB2985AVisaCommunication
| KeysightB2985AVisaCommunicationConfig
| dict
),
dev_config: KeysightB2985AConfig | dict | None = None,
) -> None:
super().__init__(com, dev_config)
del self._notify_operation_complete
# load the individual device modules.
self._status: Status = Status(self.com) # type: ignore[assignment]
self._acquire = Acquisition(self.com)
self._data = Data(self.com)
self._input = Input(self.com, self.config.module(self.com))
self._output = Output(self.com)
self._transition = Transition(self.com)
[docs]
def start(self) -> None:
"""
Start the KeysightB2985A and bring it into a defined state and remote
mode.
"""
super().start()
# reset device (RST) and clear status registers (CLS)
self.com.write_multiple("*RST", "*CLS")
# enable local display
self._local_display_state(True)
[docs]
def stop(self) -> None:
"""
Stop the KeysightB2985A and close communication. Resets the device to
local operation mode.
"""
if self._spoll_thread is not None and self._spoll_thread.is_polling():
# clear status registers (CLS)
self.com.write_multiple("*RST", "*CLS")
else:
logger.warning("KeysightB2985A was already stopped")
super().stop()
@property
def status(self) -> Status: # type: ignore[override]
"""
The status byte STB is defined in IEEE 488.2. It provides a rough overview of
the instrument status.
"""
return self._status
@property
def acquire(self) -> Acquisition:
return self._acquire
@property
def data(self) -> Data:
return self._data
@property
def input(self) -> Input:
return self._input
@property
def output(self) -> Output:
return self._output
@property
def transition(self) -> Transition:
return self._transition
def _local_display_state(self, state: bool) -> None:
"""
Enable or disable local display.
:param state: is the desired local display state
"""
self.com.write(f":DISP:ENAB {int(state)}")
[docs]
def self_test(self) -> bool:
"""
Perform a device self-test to check for correct operation.
:return: bool with result of the self test.
"""
logger.info("Running self-diagnostic.")
self.com.write("*TST?")
sleep(1)
self.wait_operation_complete(20)
result = not bool(int(self.com.read()))
if not result:
logger.warning("Self-test: FAIL")
return result
logger.info("Self-test: PASS")
return result
[docs]
def query_status(self) -> None:
"""
Queries the status byte and reads the error queue
"""
with self.com.access_lock:
# Access lock: Block communication for second query of error queue
self.status.status_summary.update_status()
if self.status.status_summary.error_queue.condition:
# error queue contains new error
logger.debug("Error in error queue")
self.com.write_bytes(b"*OPC")
self.status.status_summary.update_status()
if self.status.status_summary.standard_event_status.condition:
msg = self.get_error_queue()
raise KeysightB2985AError(msg)
if (
self.status.status_summary.output_buffer.condition
or not self.status.status_summary.standard_event_status.condition
):
return
if self.com.output_buffer:
self.com.send_output_buffer()
[docs]
def wait_operation_complete(self, timeout: float | None = None) -> bool:
"""
Wait until a task is completed.
:param timeout: timeout in seconds, defaults to 0.1 s
:raises TimeoutError: if the operation does not complete in time
:return: operation complete or timeout
"""
timeout_time = datetime.now() + timedelta(
seconds=(timeout if timeout is not None else 0)
)
# wait until event is caught
while not self.status.status_summary.standard_event_status.condition:
sleep(0.1)
if timeout is None or datetime.now() > timeout_time:
msg = "Timeout waiting for operation complete"
raise TimeoutError(msg)
return self.status.status_summary.standard_event_status.condition