# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Module with base classes for communication protocols.
"""
import logging
from abc import ABC, abstractmethod
from threading import RLock
from time import sleep
from typing import Optional
from typing_extensions import Self
from hvl_ccb.configuration import ConfigurationMixin, EmptyConfig, configdataclass
from hvl_ccb.error import CCBError
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class CommunicationError(CCBError):
pass
[docs]
class CommunicationProtocol(ConfigurationMixin, ABC):
"""
Communication protocol abstract base class.
Specifies the methods to implement for communication protocol, as well as
implements some default settings and checks.
"""
def __init__(self, config) -> None:
"""
Constructor for CommunicationProtocol. Takes a configuration dict or
configdataclass as the single parameter.
:param config: Configdataclass or dictionary to be used with the default
config dataclass.
"""
super().__init__(config)
#: Access lock to use with context manager when
#: accessing the communication protocol (thread safety)
self.access_lock = RLock()
# TECH: to be uncommented; pending for v1.0
# @property
# @abstractmethod
# def is_open(self) -> Optional[bool]:
# pass # pragma: no cover
[docs]
@abstractmethod
def open(self) -> None:
"""
Open communication protocol
"""
[docs]
@abstractmethod
def close(self) -> None:
"""
Close the communication protocol
"""
def __enter__(self) -> Self:
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
[docs]
class NullCommunicationProtocol(CommunicationProtocol):
"""
Communication protocol that does nothing.
"""
[docs]
def open(self) -> None:
"""
Void open function.
"""
[docs]
def close(self) -> None:
"""
Void close function.
"""
[docs]
@staticmethod
def config_cls() -> type[EmptyConfig]:
"""
Empty configuration
:return: EmptyConfig
"""
return EmptyConfig
[docs]
@configdataclass
class AsyncCommunicationProtocolConfig:
"""
Base configuration data class for asynchronous communication protocols
"""
#: The terminator character. Typically this is ``b'\r\n'`` or ``b'\n'``, but can
#: also be ``b'\r'`` or other combinations. This defines the end of a single line.
terminator: bytes = b"\r\n"
#: Standard encoding of the connection. Typically this is ``utf-8``, but can also
#: be ``latin-1`` or something from here:
#: https://docs.python.org/3/library/codecs.html#standard-encodings
encoding: str = "utf-8"
#: Encoding error handling scheme as defined here:
#: https://docs.python.org/3/library/codecs.html#error-handlers
#: By default strict error handling that raises `UnicodeError`.
encoding_error_handling: str = "strict"
#: time to wait between attempts of reading a non-empty text
wait_sec_read_text_nonempty: Number = 0.5
#: default number of attempts to read a non-empty text
default_n_attempts_read_text_nonempty: int = 10
[docs]
def clean_values(self) -> None:
try:
"".encode(encoding=self.encoding)
except LookupError as exc:
raise ValueError(str(exc)) from exc
try:
"\ufffd".encode(encoding="ascii", errors=self.encoding_error_handling)
except LookupError as exc:
raise ValueError(str(exc)) from exc
except UnicodeEncodeError:
# expected error
pass
if self.wait_sec_read_text_nonempty <= 0:
msg = (
"Wait time between attempts to read a non-empty text must be be a "
"positive value (in seconds)."
)
raise ValueError(msg)
if self.default_n_attempts_read_text_nonempty <= 0:
msg = (
"Default number of attempts of reading a non-empty text must be a "
"positive integer."
)
raise ValueError(msg)
[docs]
class AsyncCommunicationProtocol(CommunicationProtocol):
"""
Abstract base class for asynchronous communication protocols
"""
[docs]
@staticmethod
def config_cls() -> type[AsyncCommunicationProtocolConfig]:
return AsyncCommunicationProtocolConfig
[docs]
@abstractmethod
def read_bytes(self) -> bytes:
"""
Read a single line as `bytes` from the communication.
This method uses `self.access_lock` to ensure thread-safety.
:return: a single line as `bytes` containing the terminator, which can also be
empty b""
"""
[docs]
@abstractmethod
def write_bytes(self, data: bytes) -> int:
"""
Write data as `bytes` to the communication.
This method uses `self.access_lock` to ensure thread-safety.
:param data: data as `bytes`-string to be written
:return: number of bytes written
"""
[docs]
def read(self) -> str:
"""
Read a single line of text as `str` from the communication.
:return: text as `str` including the terminator, which can also be empty ""
"""
return self.read_bytes().decode(
encoding=self.config.encoding, errors=self.config.encoding_error_handling
)
[docs]
def write(self, text: str) -> None:
"""
Write text as `str` to the communication.
:param text: test as a `str` to be written
"""
self.write_bytes(
text.encode(
encoding=self.config.encoding,
errors=self.config.encoding_error_handling,
)
+ self.config.terminator
)
[docs]
def read_nonempty(
self,
n_attempts_max: Optional[int] = None,
attempt_interval_sec: Optional[Number] = None,
) -> Optional[str]:
"""
Try to read a non-empty single line of text as `str` from the communication.
If the host does not reply or reply with white space only, it will return None.
:return: a non-empty text as a `str` or `None` in case of an empty string
:param n_attempts_max: Amount of attempts how often a non-empty text is tried to
be read
:param attempt_interval_sec: time between the reading attempts
"""
if n_attempts_max is None:
n_attempts_max = self.config.default_n_attempts_read_text_nonempty
if attempt_interval_sec is None:
attempt_interval_sec = self.config.wait_sec_read_text_nonempty
answer = self.read().strip()
while len(answer) == 0 and n_attempts_max > 0:
sleep(attempt_interval_sec)
answer = self.read().strip()
n_attempts_max -= 1
if answer == "":
return None # Return None for an empty String
return answer
[docs]
def read_all(
self,
n_attempts_max: Optional[int] = None,
attempt_interval_sec: Optional[Number] = None,
) -> Optional[str]:
"""
Read all lines of text from the connection till nothing is left to read.
:param n_attempts_max: Amount of attempts how often a non-empty text is tried to
be read
:param attempt_interval_sec: time between the reading attempts
:return: A multi-line `str` including the terminator internally
"""
answer = self.read_nonempty(
n_attempts_max=n_attempts_max,
attempt_interval_sec=attempt_interval_sec,
)
result = ""
newline = self.config.terminator.decode(
encoding=self.config.encoding, errors=self.config.encoding_error_handling
)
while answer:
result = f"{result}{newline}{answer}"
answer = self.read_nonempty(
n_attempts_max=n_attempts_max,
attempt_interval_sec=attempt_interval_sec,
)
if result == "":
return None
return f"{result.strip()}{self.config.terminator}"
[docs]
def read_text(self) -> str:
"""
Read one line of text from the serial port. The input buffer may
hold additional data afterwards, since only one line is read.
NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0
:return: String read from the serial port; `''` if there was nothing to read.
:raises SerialCommunicationIOError: when communication port is not opened
"""
return self.read()
[docs]
def write_text(self, text: str) -> None:
"""
Write text to the serial port. The text is encoded and terminated by
the configured terminator.
NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0
:param text: Text to send to the port.
:raises SerialCommunicationIOError: when communication port is not opened
"""
return self.write(text)
[docs]
def read_text_nonempty(
self,
n_attempts_max: Optional[int] = None,
attempt_interval_sec: Optional[Number] = None,
) -> Optional[str]:
"""
Reads from the serial port, until a non-empty line is found, or the number of
attempts is exceeded.
NOTE: backward-compatibility proxy for `read` method; to be removed in v1.0
Attention: in contrast to `read_text`, the returned answer will be stripped of
a whitespace newline terminator at the end, if such terminator is set in
the initial configuration (default).
:param n_attempts_max: maximum number of read attempts
:param attempt_interval_sec: time between the reading attempts
:return: String read from the serial port; `''` if number of attempts is
exceeded or serial port is not opened.
"""
return self.read_nonempty(
n_attempts_max=n_attempts_max,
attempt_interval_sec=attempt_interval_sec,
)
[docs]
class SyncCommunicationProtocolConfig(AsyncCommunicationProtocolConfig):
"""
Base configuration data class for synchronous communication protocols
"""
[docs]
class SyncCommunicationProtocol(AsyncCommunicationProtocol, ABC):
"""
Abstract base class for synchronous communication protocols with `query()`
"""
[docs]
@staticmethod
def config_cls() -> type[SyncCommunicationProtocolConfig]:
return SyncCommunicationProtocolConfig
[docs]
def query(
self,
command: str,
n_attempts_max: Optional[int] = None,
attempt_interval_sec: Optional[Number] = None,
) -> Optional[str]:
"""
Send a command to the interface and handle the status message.
Possibly raises an exception.
:param command: Command to send
:param n_attempts_max: Amount of attempts how often a non-empty text is tried to
be read as answer
:param attempt_interval_sec: time between the reading attempts
:return: Answer from the interface, which can be None instead of an empty reply
"""
with self.access_lock:
self.write(text=command)
answer: Optional[str] = self.read_nonempty(
n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec
)
# expects an answer string or None
return answer