# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
import logging
from aenum import StrEnum
from hvl_ccb.dev.keysightb298xx.comm import KeysightB2985AVisaCommunication
from hvl_ccb.dev.keysightb298xx.modules.submodules.base import _BaseModule
from hvl_ccb.utils.validation import validate_number
logger = logging.getLogger(__name__)
[docs]
class EventSource(StrEnum):
"""Enumeration of event sources."""
INSTANT = "INST"
AUTO = "AINT"
REMOTE = "BUS"
TIMER = "TIM"
INTERNAL_1 = "INT1"
INTERNAL_2 = "INT2"
LAN = "LAN"
GPIO_1 = "EXT1"
GPIO_2 = "EXT2"
GPIO_3 = "EXT3"
GPIO_4 = "EXT4"
GPIO_5 = "EXT5"
GPIO_6 = "EXT6"
GPIO_7 = "EXT7"
BNC_IN = "TIN"
[docs]
class EventOut(StrEnum):
"""Enumeration of event outputs."""
OFF = "OFF"
INTERNAL_1 = "INT1"
INTERNAL_2 = "INT2"
LAN = "LAN"
GPIO_1 = "EXT1"
GPIO_2 = "EXT2"
GPIO_3 = "EXT3"
GPIO_4 = "EXT4"
GPIO_5 = "EXT5"
GPIO_6 = "EXT6"
GPIO_7 = "EXT7"
BNC_OUT = "TOUT"
[docs]
class LanTrigger(StrEnum):
"""Enumeration of LAN triggers for event generation."""
LAN_0 = "LAN0"
LAN_1 = "LAN1"
LAN_2 = "LAN2"
LAN_3 = "LAN3"
LAN_4 = "LAN4"
LAN_5 = "LAN5"
LAN_6 = "LAN6"
LAN_7 = "LAN7"
class _TrigArm(_BaseModule):
"""
Class to set the arm and trigger settings.
"""
def __init__(
self, com: KeysightB2985AVisaCommunication, base_command: str, name: str
) -> None:
super().__init__(com, base_command, name)
self._event_instant: bool = False
@property
def count(self) -> int:
"""
Get the set counter.
:return: The set counter.
"""
return int(self._com.query(f"{self._base_command}:COUN?"))
@count.setter
def count(self, value: int) -> None:
"""
Set the counter.
:param value: The counter value to set.
"""
validate_number(f"{self._name}-counter", value, (1, 100_000), int, logger)
logger.info(f"{self._name}-counter: {value}")
self._com.write(f"{self._base_command}:COUN {value}")
@property
def delay(self) -> float:
"""
Get the delay setting.
:return: The delay setting in seconds.
"""
return float(self._com.query(f"{self._base_command}:DEL?"))
@delay.setter
def delay(self, value: float) -> None:
"""
Set the delay setting.
:param value: The delay setting in seconds.
"""
validate_number(
f"{self._name}-delay", value, (0, 100_000), (int, float), logger
)
logger.info(f"{self._name}-delay: {value} s")
self._com.write(f"{self._base_command}:DEL {value}")
@property
def event_bypass(self) -> bool:
"""
Get the event bypass setting.
:return: True if event bypass is enabled, False otherwise.
"""
answer = self._com.query(f"{self._base_command}:BYP?")
return answer != "OFF"
@event_bypass.setter
def event_bypass(self, value: bool) -> None:
"""
Set the event bypass setting.
:param value: True to enable event bypass, False to disable.
"""
command: str = "ONCE" if value else "OFF"
logger.info(f"{self._name}-bypass: {command}")
self._com.write(f"{self._base_command}:BYP {command}")
@property
def event_source(self) -> EventSource:
"""
Get the set event source.
:return: The set event source.
"""
if self._event_instant:
return EventSource.INSTANT # type: ignore[return-value]
return EventSource(self._com.query(f"{self._base_command}:SOUR?"))
@event_source.setter
def event_source(self, value: EventSource) -> None:
"""
Set the event source.
:param value: The event source to set.
"""
value = EventSource(value)
logger.info(f"{self._name}-event source: {value}")
if value == EventSource.INSTANT:
self._com.write(f"{self._base_command}:IMM")
self._event_instant = True
return
self._com.write(f"{self._base_command}:SOUR {value}")
@property
def event_out(self) -> EventOut:
"""
Get the set event output.
:return: The set event output.
"""
status, channel = self._com.query_multiple(
f"{self._base_command}:TOUT:STAT?", f"{self._base_command}:TOUT:SIGN?"
)
if not int(status):
return EventOut.OFF # type: ignore[return-value]
return EventOut(channel)
@event_out.setter
def event_out(self, value: EventOut) -> None:
"""
Set the event output.
:param value: The event output to set.
"""
value = EventOut(value)
logger.info(f"{self._name}-event output: {value}")
if value == EventOut.OFF:
self._com.write(f"{self._base_command}:TOUT:STAT 0")
return
self._com.write(f"{self._base_command}:TOUT:STAT 1")
self._com.write(f"{self._base_command}:TOUT:SIGN {value}")
@property
def event_timer_interval(self) -> float:
"""
Get the event timer interval.
:return: The event timer interval in seconds.
"""
return float(self._com.query(f"{self._base_command}:TIM?"))
@event_timer_interval.setter
def event_timer_interval(self, value: float) -> None:
"""
Set the event timer interval.
:param value: The event timer interval in seconds.
"""
validate_number(
f"{self._name}-timer-interval", value, (1e-4, 1e5), (int, float), logger
)
logger.info(f"{self._name}-event timer interval: {value} s")
self._com.write(f"{self._base_command}:TIM {value}")
@property
def event_lan_source(self) -> list[LanTrigger]:
"""
Get the set LAN event source.
:return: The set LAN event source.
"""
answer = self._com.query(f"{self._base_command}:SOUR:LAN?").split(",")
return [LanTrigger(x) for x in answer]
@event_lan_source.setter
def event_lan_source(self, value: list[LanTrigger]) -> None:
"""
Set the LAN event source.
:param value: The LAN event source to set.
"""
if not isinstance(value, list):
msg = "The LAN event source must be of type list"
raise TypeError(msg)
source_str = ",".join(LanTrigger(x) for x in value)
logger.info(f"{self._name}-LAN event source: {source_str}")
self._com.write(f"{self._base_command}:SOUR:LAN {source_str}")
class _AcqTrans(_BaseModule):
"""
Class to control the acquisition and transition arm and trigger settings.
"""
def __init__(
self, com: KeysightB2985AVisaCommunication, base_command: str, name: str
) -> None:
super().__init__(com, base_command, name)
self.arm: _TrigArm = _TrigArm(self._com, f":ARM{self._base_command}", "arm")
self.trigger: _TrigArm = _TrigArm(
self._com, f":TRIG{self._base_command}", "trigger"
)
def start(self) -> None:
"""
Start the acquisition process.
"""
logger.info(f"{self._name} start")
self._com.write(f":INIT{self._base_command}")
def abort(self) -> None:
"""
Abort the acquisition process.
"""
logger.info(f"{self._name} abort")
self._com.write(f":ABORT{self._base_command}")
@property
def idle(self) -> bool:
"""
Get the idle status of the acquisition process.
:return: True if the acquisition process is idle, False otherwise.
"""
return bool(int(self._com.query(f":IDLE{self._base_command}?")))