Source code for hvl_ccb.dev.technix.base

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication and auxiliary classes for Technix
"""

import logging
from abc import ABC
from typing import NamedTuple, Optional, Union

from hvl_ccb import configdataclass
from hvl_ccb.comm import SyncCommunicationProtocol, SyncCommunicationProtocolConfig
from hvl_ccb.comm.serial import SerialCommunication, SerialCommunicationConfig
from hvl_ccb.comm.telnet import TelnetCommunication, TelnetCommunicationConfig
from hvl_ccb.dev import DeviceError
from hvl_ccb.utils.enum import ValueEnum
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] class TechnixError(DeviceError): """ Technix related errors. """
[docs] class TechnixFaultError(TechnixError): """ Raised when the fault flag was detected while the interlock is closed """
@configdataclass class _TechnixCommunicationConfig(SyncCommunicationProtocolConfig): #: The terminator is CR terminator: bytes = b"\r" class _TechnixCommunication(SyncCommunicationProtocol, ABC): """ Generic communication class for Technix, which can be implemented via `TechnixSerialCommunication` or `TechnixTelnetCommunication` """ def query( self, command: str, n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Number] = None, ) -> str: """ Send a command to the interface and handle the status message. Possibly raises an error. :param command: Command to send :param n_attempts_max: Amount of attempts how often a non-empty text is tried to be read as answer :param attempt_interval_sec: time between the reading attempts :raises TechnixError: if the connection is broken :return: Answer from the interface """ with self.access_lock: logger.debug(f"TechnixCommunication, send: '{command}'") answer: Optional[str] = super().query( command, n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec, ) # string or None logger.debug(f"TechnixCommunication, receive: '{answer}'") if answer is None: msg = f"TechnixCommunication did get no answer on command: '{command}'" logger.error(msg) raise TechnixError(msg) return answer
[docs] @configdataclass class TechnixSerialCommunicationConfig( _TechnixCommunicationConfig, SerialCommunicationConfig ): """ Configuration for the serial communication for Technix """ pass
[docs] class TechnixSerialCommunication(_TechnixCommunication, SerialCommunication): """ Serial communication for Technix """
[docs] @staticmethod def config_cls(): return TechnixSerialCommunicationConfig
[docs] @configdataclass class TechnixTelnetCommunicationConfig( _TechnixCommunicationConfig, TelnetCommunicationConfig ): """ Configuration for the telnet communication for Technix """ #: Port at which Technix is listening port: int = 4660
[docs] class TechnixTelnetCommunication(TelnetCommunication, _TechnixCommunication): """ Telnet communication for Technix """
[docs] @staticmethod def config_cls(): return TechnixTelnetCommunicationConfig
_TechnixCommunicationClasses = Union[ type[TechnixSerialCommunication], type[TechnixTelnetCommunication] ] class _SetRegisters(ValueEnum): VOLTAGE = "d1" # Output Voltage programming CURRENT = "d2" # Output Current programming HVON = "P5" # HV on HVOFF = "P6" # HV off LOCAL = "P7" # Local/remote mode INHIBIT = "P8" # Inhibit class _GetRegisters(ValueEnum): VOLTAGE = "a1" # Output Voltage Monitor CURRENT = "a2" # Output Current Monitor STATUS = "E" # Image of the power supply logical status class _Status(NamedTuple): """ Container for the different statuses of the device. It can also handle the most recent reading of the voltage and current at the output. """ inhibit: bool remote: bool hv_off: bool hv_on: bool output: bool open_interlock: bool fault: bool voltage_regulation: bool voltage: Optional[Number] current: Optional[Number]