Source code for hvl_ccb.comm.base

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Module with base classes for communication protocols.
"""

import logging
from abc import ABC, abstractmethod
from threading import RLock
from time import sleep

from typing_extensions import Self

from hvl_ccb.configuration import ConfigurationMixin, EmptyConfig, configdataclass
from hvl_ccb.error import CCBError
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] class CommunicationError(CCBError): pass
[docs] class CommunicationProtocol(ConfigurationMixin, ABC): """ Communication protocol abstract base class. Specifies the methods to implement for communication protocol, as well as implements some default settings and checks. """ def __init__(self, config) -> None: """ Constructor for CommunicationProtocol. Takes a configuration dict or configdataclass as the single parameter. :param config: Configdataclass or dictionary to be used with the default config dataclass. """ super().__init__(config) #: Access lock to use with context manager when #: accessing the communication protocol (thread safety) self.access_lock = RLock() # TECH: to be uncommented; pending for v1.0 # @property # @abstractmethod # def is_open(self) -> Optional[bool]: # pass # pragma: no cover
[docs] @abstractmethod def open(self) -> None: """ Open communication protocol """
[docs] @abstractmethod def close(self) -> None: """ Close the communication protocol """
def __enter__(self) -> Self: self.open() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()
[docs] class NullCommunicationProtocol(CommunicationProtocol): """ Communication protocol that does nothing. """
[docs] def open(self) -> None: """ Void open function. """
[docs] def close(self) -> None: """ Void close function. """
[docs] @staticmethod def config_cls() -> type[EmptyConfig]: """ Empty configuration :return: EmptyConfig """ return EmptyConfig
[docs] @configdataclass class AsyncCommunicationProtocolConfig: """ Base configuration data class for asynchronous communication protocols """ #: The terminator character. Typically this is ``b'\r\n'`` or ``b'\n'``, but can #: also be ``b'\r'`` or other combinations. This defines the end of a single line. terminator: bytes = b"\r\n" #: Standard encoding of the connection. Typically this is ``utf-8``, but can also #: be ``latin-1`` or something from here: #: https://docs.python.org/3/library/codecs.html#standard-encodings encoding: str = "utf-8" #: Encoding error handling scheme as defined here: #: https://docs.python.org/3/library/codecs.html#error-handlers #: By default strict error handling that raises `UnicodeError`. encoding_error_handling: str = "strict" #: time to wait between attempts of reading a non-empty text wait_sec_read_text_nonempty: Number = 0.5 #: default number of attempts to read a non-empty text default_n_attempts_read_text_nonempty: int = 10
[docs] def clean_values(self) -> None: try: "".encode(encoding=self.encoding) except LookupError as exc: raise ValueError(str(exc)) from exc try: "\ufffd".encode(encoding="ascii", errors=self.encoding_error_handling) except LookupError as exc: raise ValueError(str(exc)) from exc except UnicodeEncodeError: # expected error pass if self.wait_sec_read_text_nonempty <= 0: msg = ( "Wait time between attempts to read a non-empty text must be be a " "positive value (in seconds)." ) raise ValueError(msg) if self.default_n_attempts_read_text_nonempty <= 0: msg = ( "Default number of attempts of reading a non-empty text must be a " "positive integer." ) raise ValueError(msg)
[docs] class AsyncCommunicationProtocol(CommunicationProtocol): """ Abstract base class for asynchronous communication protocols """
[docs] @staticmethod def config_cls() -> type[AsyncCommunicationProtocolConfig]: return AsyncCommunicationProtocolConfig
[docs] @abstractmethod def read_bytes(self) -> bytes: """ Read a single line as `bytes` from the communication. This method uses `self.access_lock` to ensure thread-safety. :return: a single line as `bytes` containing the terminator, which can also be empty b"" """
[docs] @abstractmethod def write_bytes(self, data: bytes) -> int: """ Write data as `bytes` to the communication. This method uses `self.access_lock` to ensure thread-safety. :param data: data as `bytes`-string to be written :return: number of bytes written """
[docs] def read(self) -> str: """ Read a single line of text as `str` from the communication. :return: text as `str` including the terminator, which can also be empty "" """ return self.read_bytes().decode( encoding=self.config.encoding, errors=self.config.encoding_error_handling )
[docs] def write(self, text: str) -> None: """ Write text as `str` to the communication. :param text: test as a `str` to be written """ self.write_bytes( text.encode( encoding=self.config.encoding, errors=self.config.encoding_error_handling, ) + self.config.terminator )
[docs] def read_nonempty( self, n_attempts_max: int | None = None, attempt_interval_sec: Number | None = None, ) -> str | None: """ Try to read a non-empty single line of text as `str` from the communication. If the host does not reply or reply with white space only, it will return None. :return: a non-empty text as a `str` or `None` in case of an empty string :param n_attempts_max: Amount of attempts how often a non-empty text is tried to be read :param attempt_interval_sec: time between the reading attempts """ if n_attempts_max is None: n_attempts_max = self.config.default_n_attempts_read_text_nonempty if attempt_interval_sec is None: attempt_interval_sec = self.config.wait_sec_read_text_nonempty answer = self.read().strip() while len(answer) == 0 and n_attempts_max > 0: sleep(attempt_interval_sec) answer = self.read().strip() n_attempts_max -= 1 if answer == "": return None # Return None for an empty String return answer
[docs] def read_all( self, n_attempts_max: int | None = None, attempt_interval_sec: Number | None = None, ) -> str | None: """ Read all lines of text from the connection till nothing is left to read. :param n_attempts_max: Amount of attempts how often a non-empty text is tried to be read :param attempt_interval_sec: time between the reading attempts :return: A multi-line `str` including the terminator internally """ answer = self.read_nonempty( n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec, ) result = "" newline = self.config.terminator.decode( encoding=self.config.encoding, errors=self.config.encoding_error_handling ) while answer: result = f"{result}{newline}{answer}" answer = self.read_nonempty( n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec, ) if result == "": return None return f"{result.strip()}{self.config.terminator}"
[docs] def read_text(self) -> str: """ Read one line of text from the serial port. The input buffer may hold additional data afterwards, since only one line is read. NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0 :return: String read from the serial port; `''` if there was nothing to read. :raises SerialCommunicationIOError: when communication port is not opened """ return self.read()
[docs] def write_text(self, text: str) -> None: """ Write text to the serial port. The text is encoded and terminated by the configured terminator. NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0 :param text: Text to send to the port. :raises SerialCommunicationIOError: when communication port is not opened """ return self.write(text)
[docs] def read_text_nonempty( self, n_attempts_max: int | None = None, attempt_interval_sec: Number | None = None, ) -> str | None: """ Reads from the serial port, until a non-empty line is found, or the number of attempts is exceeded. NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0 Attention: in contrast to `read_text`, the returned answer will be stripped of a whitespace newline terminator at the end, if such terminator is set in the initial configuration (default). :param n_attempts_max: maximum number of read attempts :param attempt_interval_sec: time between the reading attempts :return: String read from the serial port; `''` if number of attempts is exceeded or serial port is not opened. """ return self.read_nonempty( n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec, )
[docs] class SyncCommunicationProtocolConfig(AsyncCommunicationProtocolConfig): """ Base configuration data class for synchronous communication protocols """
[docs] class SyncCommunicationProtocol(AsyncCommunicationProtocol, ABC): """ Abstract base class for synchronous communication protocols with `query()` """
[docs] @staticmethod def config_cls() -> type[SyncCommunicationProtocolConfig]: return SyncCommunicationProtocolConfig
[docs] def query( self, command: str, n_attempts_max: int | None = None, attempt_interval_sec: Number | None = None, ) -> str | None: """ Send a command to the interface and handle the status message. Possibly raises an exception. :param command: Command to send :param n_attempts_max: Amount of attempts how often a non-empty text is tried to be read as answer :param attempt_interval_sec: time between the reading attempts :return: Answer from the interface, which can be None instead of an empty reply """ with self.access_lock: self.write(text=command) answer: str | None = self.read_nonempty( n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec ) # expects an answer string or None return answer