# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
SerialCommunication for FuG
"""
import logging
from typing import Optional, Union, cast
from hvl_ccb.comm.base import SyncCommunicationProtocol
from hvl_ccb.comm.serial import (
SerialCommunication,
SerialCommunicationBytesize,
SerialCommunicationConfig,
SerialCommunicationParity,
SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.typing import Number
from .errors import FuGError, FuGErrorcodes
logger = logging.getLogger(__name__)
[docs]
@configdataclass
class FuGSerialCommunicationConfig(SerialCommunicationConfig):
"""
Configuration dataclass for :class:`FuGSerialCommunication`.
"""
#: Baudrate for FuG power supplies is 9600 baud
baudrate: int = 9600
#: FuG does not use parity
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: FuG uses one stop bit
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: Union[int, SerialCommunicationBytesize] = (
SerialCommunicationBytesize.EIGHTBITS
)
#: The terminator is LF
terminator: bytes = b"\n"
#: use 3 seconds timeout as default
timeout: Number = 3
#: default time to wait between attempts of reading a non-empty text
wait_sec_read_text_nonempty: Number = 0.5
#: default number of attempts to read a non-empty text
default_n_attempts_read_text_nonempty: int = 10
[docs]
class FuGSerialCommunication(SerialCommunication, SyncCommunicationProtocol):
"""
Specific communication protocol implementation for
FuG power supplies.
Already predefines device-specific protocol parameters in config.
"""
[docs]
@staticmethod
def config_cls():
return FuGSerialCommunicationConfig
[docs]
def query(
self,
command: str,
n_attempts_max: Optional[int] = None,
attempt_interval_sec: Optional[Number] = None,
) -> Optional[str]:
"""
Send a command to the interface and handle the status message.
Raises an error, if the answer starts with "E".
:param command: Command to send
:raises FuGError: if the connection is broken or the error from the power
source itself
:return: Answer from the interface or empty string
"""
with self.access_lock:
logger.debug(f"FuG communication, send: {command}")
answer: Optional[str] = super().query(
command, n_attempts_max, attempt_interval_sec
)
logger.debug(f"FuG communication, receive: {answer}")
if not answer:
cast(FuGErrorcodes, FuGErrorcodes.E504).raise_()
try:
FuGErrorcodes(cast(str, answer)).raise_()
except ValueError as exc:
logger.exception(
"ValueError at finding the correct FuGErrorcode", exc_info=exc
)
if cast(str, answer).startswith("E"):
msg = f'The unknown errorcode "{answer}" was detected.'
raise FuGError(msg) from exc
return answer
else:
return ""