# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for serial ports. Makes use of the `pySerial
<https://pythonhosted.org/pyserial/index.html>`_ library.
"""
import logging
import warnings
from typing import cast
# Note: PyCharm does not recognize the dependency correctly, it is added as pyserial.
import serial
from hvl_ccb.comm import (
AsyncCommunicationProtocol,
AsyncCommunicationProtocolConfig,
CommunicationError,
)
from hvl_ccb.configuration import ConfigurationValueWarning, configdataclass
from hvl_ccb.utils.enum import ValueEnum, unique
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class SerialCommunicationIOError(IOError, CommunicationError):
"""Serial communication related I/O errors."""
[docs]
@unique
class SerialCommunicationParity(ValueEnum):
"""
Serial communication parity.
"""
EVEN = serial.PARITY_EVEN
MARK = serial.PARITY_MARK
NAMES = serial.PARITY_NAMES
NONE = serial.PARITY_NONE
ODD = serial.PARITY_ODD
SPACE = serial.PARITY_SPACE
[docs]
@unique
class SerialCommunicationStopbits(ValueEnum):
"""
Serial communication stopbits.
"""
ONE = serial.STOPBITS_ONE
ONE_POINT_FIVE = serial.STOPBITS_ONE_POINT_FIVE
TWO = serial.STOPBITS_TWO
[docs]
@unique
class SerialCommunicationBytesize(ValueEnum):
"""
Serial communication bytesize.
"""
FIVEBITS = serial.FIVEBITS
SIXBITS = serial.SIXBITS
SEVENBITS = serial.SEVENBITS
EIGHTBITS = serial.EIGHTBITS
[docs]
@configdataclass
class SerialCommunicationConfig(AsyncCommunicationProtocolConfig):
"""
Configuration dataclass for :class:`SerialCommunication`.
"""
Parity = SerialCommunicationParity
Stopbits = SerialCommunicationStopbits
Bytesize = SerialCommunicationBytesize
#: Port is a string referring to a COM-port (e.g. ``'COM3'``) or a URL.
#: The full list of capabilities is found `on the pyserial documentation
#: <https://pythonhosted.org/pyserial/url_handlers.html>`_.
port: str | None = None
#: Baudrate of the serial port
baudrate: int = 9600
#: Parity to be used for the connection.
parity: str | SerialCommunicationParity = Parity.NONE
#: Stopbits setting, can be 1, 1.5 or 2.
stopbits: Number | SerialCommunicationStopbits = Stopbits.ONE
#: Size of a byte, 5 to 8
bytesize: int | SerialCommunicationBytesize = Bytesize.EIGHTBITS
#: Timeout in seconds for the serial port
timeout: Number = 2
[docs]
def clean_values(self) -> None:
super().clean_values()
if not isinstance(self.parity, SerialCommunicationParity):
self.force_value("parity", SerialCommunicationParity(self.parity)) # type: ignore[attr-defined]
if not isinstance(self.stopbits, SerialCommunicationStopbits):
self.force_value("stopbits", SerialCommunicationStopbits(self.stopbits)) # type: ignore[attr-defined]
if not isinstance(self.bytesize, SerialCommunicationBytesize):
self.force_value("bytesize", SerialCommunicationBytesize(self.bytesize)) # type: ignore[attr-defined]
if self.timeout < 0:
msg = "Timeout has to be >= 0."
raise ValueError(msg)
if self.timeout < 1: # min. viable threshold as tested w/ Arduino Nano board
warnings.warn(
"Setting a too low timeout for a serial connection communication"
" may lead to random errors during a communication with a device.",
category=ConfigurationValueWarning,
stacklevel=2,
)
[docs]
def create_serial_port(self) -> serial.Serial:
"""
Create a serial port instance according to specification in this configuration
:return: Closed serial port instance
"""
ser = serial.serial_for_url(self.port, do_not_open=True)
ser.baudrate = self.baudrate
ser.parity = cast(SerialCommunicationParity, self.parity).value
ser.stopbits = cast(SerialCommunicationStopbits, self.stopbits).value
ser.bytesize = cast(SerialCommunicationBytesize, self.bytesize).value
ser.timeout = self.timeout
return ser
[docs]
def terminator_str(self) -> str:
return self.terminator.decode()
[docs]
class SerialCommunication(AsyncCommunicationProtocol):
"""
Implements the Communication Protocol for serial ports.
"""
def __init__(self, configuration) -> None:
"""
Constructor for SerialCommunication.
"""
super().__init__(configuration)
self._serial_port = self.config.create_serial_port()
[docs]
@staticmethod
def config_cls():
return SerialCommunicationConfig
[docs]
def open(self) -> None:
"""
Open the serial connection.
:raises SerialCommunicationIOError: when communication port cannot be opened.
"""
# open the port
with self.access_lock:
try:
self._serial_port.open()
except serial.SerialException as exc:
# ignore when port is already open
logger.exception(
"Error of Serial Connection, maybe it is already open.",
exc_info=exc,
)
if str(exc) != "Port is already open.":
raise SerialCommunicationIOError from exc
[docs]
def close(self) -> None:
"""
Close the serial connection.
"""
# close the port
with self.access_lock:
self._serial_port.close()
@property
def is_open(self) -> bool:
"""
Flag indicating if the serial port is open.
:return: `True` if the serial port is open, otherwise `False`
"""
return self._serial_port.is_open
[docs]
def read_bytes(self) -> bytes:
"""
Read the bytes from the serial port till the terminator is found.
The input buffer may hold additional lines afterwards.
This method uses `self.access_lock` to ensure thread-safety.
:return: Bytes read from the serial port; `b''` if there was nothing to read.
:raises SerialCommunicationIOError: when communication port is not opened
"""
with self.access_lock:
try:
return self._serial_port.read_until(self.config.terminator)
except serial.SerialException as exc:
logger.exception("Error of Serial Communication", exc_info=exc)
raise SerialCommunicationIOError from exc
[docs]
def write_bytes(self, data: bytes) -> int:
"""
Write bytes to the serial port.
This method uses `self.access_lock` to ensure thread-safety.
:param data: data to write to the serial port
:return: number of bytes written
:raises SerialCommunicationIOError: when communication port is not opened
"""
with self.access_lock:
try:
return self._serial_port.write(data)
except serial.SerialException as exc:
logger.exception("Error of Serial Communication", exc_info=exc)
raise SerialCommunicationIOError from exc
[docs]
def read_single_bytes(self, size: int = 1) -> bytes:
"""
Read the specified number of bytes from the serial port.
The input buffer may hold additional data afterwards.
:return: Bytes read from the serial port; `b''` if there was nothing to read.
"""
with self.access_lock:
try:
return self._serial_port.read(size)
except serial.SerialException as exc:
logger.exception("Error of Serial Communication", exc_info=exc)
raise SerialCommunicationIOError from exc