Source code for hvl_ccb.dev.fug.comm

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
SerialCommunication for FuG
"""

import logging
from typing import Optional, Union, cast

from hvl_ccb.comm.base import SyncCommunicationProtocol
from hvl_ccb.comm.serial import (
    SerialCommunication,
    SerialCommunicationBytesize,
    SerialCommunicationConfig,
    SerialCommunicationParity,
    SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.typing import Number

from .errors import FuGError, FuGErrorcodes

logger = logging.getLogger(__name__)


[docs] @configdataclass class FuGSerialCommunicationConfig(SerialCommunicationConfig): """ Configuration dataclass for :class:`FuGSerialCommunication`. """ #: Baudrate for FuG power supplies is 9600 baud baudrate: int = 9600 #: FuG does not use parity parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE #: FuG uses one stop bit stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE #: One byte is eight bits long bytesize: Union[int, SerialCommunicationBytesize] = ( SerialCommunicationBytesize.EIGHTBITS ) #: The terminator is LF terminator: bytes = b"\n" #: use 3 seconds timeout as default timeout: Number = 3 #: default time to wait between attempts of reading a non-empty text wait_sec_read_text_nonempty: Number = 0.5 #: default number of attempts to read a non-empty text default_n_attempts_read_text_nonempty: int = 10
[docs] class FuGSerialCommunication(SerialCommunication, SyncCommunicationProtocol): """ Specific communication protocol implementation for FuG power supplies. Already predefines device-specific protocol parameters in config. """
[docs] @staticmethod def config_cls(): return FuGSerialCommunicationConfig
[docs] def query( self, command: str, n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Number] = None, ) -> Optional[str]: """ Send a command to the interface and handle the status message. Raises an error, if the answer starts with "E". :param command: Command to send :raises FuGError: if the connection is broken or the error from the power source itself :return: Answer from the interface or empty string """ with self.access_lock: logger.debug(f"FuG communication, send: {command}") answer: Optional[str] = super().query( command, n_attempts_max, attempt_interval_sec ) logger.debug(f"FuG communication, receive: {answer}") if not answer: cast(FuGErrorcodes, FuGErrorcodes.E504).raise_() try: FuGErrorcodes(cast(str, answer)).raise_() except ValueError as exc: logger.exception( "ValueError at finding the correct FuGErrorcode", exc_info=exc ) if cast(str, answer).startswith("E"): msg = f'The unknown errorcode "{answer}" was detected.' raise FuGError(msg) from exc return answer else: return ""