Source code for hvl_ccb.comm.tcp

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
TCP communication protocol. Makes use of the socket library.
"""

import logging
import socket
import warnings
from ipaddress import IPv4Address, IPv6Address
from typing import cast

from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.validation import (
    validate_and_resolve_host,
    validate_number,
    validate_tcp_port,
)

from .base import (
    AsyncCommunicationProtocol,
    AsyncCommunicationProtocolConfig,
    CommunicationError,
)

logger = logging.getLogger(__name__)


[docs] class TcpCommunicationError(CommunicationError): """Error of the TcpCommunication"""
[docs] @configdataclass class TcpCommunicationConfig(AsyncCommunicationProtocolConfig): """ Configuration dataclass for :class:`TcpCommunication`. """ # Host is the IP address of the connected device. host: str | IPv4Address | IPv6Address | None = None # TCP port port: int = 54321 # TCP receiving buffersize bufsize: int = 1024 # TCP timeout timeout: float | None = 0.2
[docs] def clean_values(self) -> None: # if necessary, converts host to a valid IP address super().clean_values() # Host self.force_value("host", validate_and_resolve_host(self.host, logger)) # type: ignore[attr-defined] # Port validate_tcp_port(self.port, logger) # Buffer size validate_number("bufsize", self.bufsize, (1, None), int, logger=logger) # Timeout validate_number("timeout", self.timeout, (0, None), logger=logger)
[docs] class TcpCommunication(AsyncCommunicationProtocol): """ Tcp Communication Protocol. """ config: TcpCommunicationConfig def __init__(self, configuration) -> None: """Constructor socket""" super().__init__(configuration) # create the communication port specified in the configuration logger.debug( "Create socket TcpClient with host: " f'"{self.config.host}", Port: "{self.config.port}"' ) self._sock: socket.socket | None = None
[docs] @staticmethod def config_cls() -> type[TcpCommunicationConfig]: return TcpCommunicationConfig
@property def is_open(self) -> bool: """ Is the connection open? :return: True for an open connection """ return self._sock is not None
[docs] def open(self) -> None: """ Open TCP connection. """ if self.is_open: logger.warning( "Tried to open the TCP Communication, but it is already open" ) return # open the port logger.debug("Open TCP Port.") with self.access_lock: self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.config.host, self.config.port)) self._sock.settimeout(self.config.timeout)
[docs] def close(self) -> None: """ Close TCP connection. """ if not self.is_open: logger.warning( "Tried to close the TCP Communication, but it is already closed" ) return # close the port logger.debug("Close TCP Port.") with self.access_lock: sock: socket.socket = cast("socket.socket", self._sock) sock.close() self._sock = None
[docs] def write_bytes(self, data: bytes) -> int: """ Write data as `bytes` to the TcpCommunication. This method uses `self.access_lock` to ensure thread-safety. :param data: data as `bytes`-string to be written :return: number of bytes written """ if not self.is_open: msg = "Tried to write to the TCP Communication, but it is closed" logger.error(msg) raise TcpCommunicationError(msg) with self.access_lock: sock: socket.socket = cast("socket.socket", self._sock) return sock.send(data)
[docs] def read_bytes(self) -> bytes: """ Read a single line as `bytes` from the TcpCommunication. This method uses `self.access_lock` to ensure thread-safety. :return: a single line as `bytes` containing the terminator, which can also be empty b"" """ if not self.is_open: msg = "Tried to read from the TCP Communication, but it is closed" logger.error(msg) raise TcpCommunicationError(msg) with self.access_lock: sock: socket.socket = cast("socket.socket", self._sock) try: return sock.recv(self.config.bufsize) except TimeoutError: msg = "Timeout during reading from TCP, return empty bytes" logger.debug(msg) return b""
[docs] class Tcp(TcpCommunication): """Old name :class:`Tcp` for :class:`TcpCommunication` for keeping this import backwards compatibile""" def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) warnings.warn( "The 'Tcp' class is deprecated, use 'TcpCommunication' instead", DeprecationWarning, stacklevel=2, )