Source code for hvl_ccb.comm.tcp

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
TCP communication protocol. Makes use of the socket library.
"""

import logging
import socket
from ipaddress import IPv4Address, IPv6Address
from typing import Union

from hvl_ccb.comm import CommunicationProtocol
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.validation import validate_and_resolve_host, validate_tcp_port

logger = logging.getLogger(__name__)


[docs] @configdataclass class TcpCommunicationConfig: """ Configuration dataclass for :class:`TcpCommunication`. """ # Host is the IP address of the connected device. host: Union[str, IPv4Address, IPv6Address] # TCP port port: int = 54321 # TCP receiving buffersize bufsize: int = 1024
[docs] def clean_values(self): # if necessary, converts host to a valid IP address self.force_value("host", validate_and_resolve_host(self.host, logger)) validate_tcp_port(self.port, logger) if self.bufsize < 1: raise ValueError("Buffer size has to be >= 1")
[docs] class Tcp(CommunicationProtocol): """ Tcp Communication Protocol. """ def __init__(self, configuration): """Constructor socket""" super().__init__(configuration) # create the communication port specified in the configuration logger.debug( 'Create socket TcpClient with host: "{}", Port: "{}"'.format( self.config.host, self.config.port, ) ) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[docs] @staticmethod def config_cls() -> type[TcpCommunicationConfig]: return TcpCommunicationConfig
[docs] def write(self, command: str = "") -> None: """ TCP write function :param command: command string to be sent :return: none """ self.sock.send(bytes(command, "ascii")) logger.debug( "Sent {command}".format( command=command, ) )
[docs] def read(self) -> str: """ TCP read function :return: information read from TCP buffer formatted as string """ reply = self.sock.recv(self.config.bufsize) reply = reply.decode("ascii") logger.debug( "Received via TCP: {reply}".format( reply=reply, ) ) return reply
[docs] def open(self) -> None: """ Open TCP connection. """ # open the port logger.debug("Open TCP Port.") self.sock.connect((self.config.host, self.config.port))
[docs] def close(self) -> None: """ Close TCP connection. """ # close the port logger.debug("Close TCP Port.") self.sock.close()