Source code for hvl_ccb.comm.modbus

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for Modbus communications. Makes use of the
`pymodbus <https://pymodbus.readthedocs.io/en/latest/>`_ library.
"""

import logging
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv6Address
from typing import cast

from pymodbus.client import ModbusBaseSyncClient, ModbusSerialClient, ModbusTcpClient
from pymodbus.exceptions import ConnectionException
from pymodbus.framer import FramerType

from hvl_ccb.comm import CommunicationError, CommunicationProtocol
from hvl_ccb.comm.serial import (
    SerialCommunicationBytesize,
    SerialCommunicationConfig,
    SerialCommunicationParity,
    SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.validation import (
    validate_and_resolve_host,
    validate_number,
    validate_tcp_port,
)

logger = logging.getLogger(__name__)


[docs] class ModbusConnectionError(ConnectionException, CommunicationError): """ Error raised when the connection failed. """
[docs] @configdataclass class ModbusBaseCommunicationConfig: """Base modbus communication config""" # Unit number to be used when connecting with Modbus. Typically this is used # when connecting to multiple Modbus devices. unit: int = 1
[docs] def clean_values(self) -> None: validate_number("unit", self.unit, (0, 255), int, logger)
[docs] @configdataclass class ModbusTcpCommunicationConfig(ModbusBaseCommunicationConfig): """ Configuration dataclass for :class:`ModbusTcpCommunication`. """ # Host is the IP address or hostname of the connected device. host: str | IPv4Address | IPv6Address | None = None # TCP port port: int = 502
[docs] def clean_values(self) -> None: super().clean_values() self.force_value("host", validate_and_resolve_host(self.host, logger)) # type: ignore[attr-defined] validate_tcp_port(self.port, logger)
[docs] @configdataclass class ModbusSerialCommunicationConfig( ModbusBaseCommunicationConfig, SerialCommunicationConfig ): """ Configuration dataclass for :class:`ModbusRtuCommunication`. """ # Framer for the ModbusSerial communication framer: str | FramerType = FramerType.RTU
[docs] def clean_values(self) -> None: SerialCommunicationConfig.clean_values(self) ModbusBaseCommunicationConfig.clean_values(self) if not isinstance(self.framer, FramerType): self.force_value("framer", FramerType(self.framer)) # type: ignore[attr-defined] if self.framer not in [FramerType.RTU, FramerType.ASCII]: msg = "Framer must be 'RTU' or 'ASCII'" logger.error(msg) raise ModbusConnectionError(msg) if not self.port: msg = "Cannot generate a Modbus Serial connection without a COM-Port" logger.error(msg) raise ModbusConnectionError(msg) if self.stopbits == SerialCommunicationStopbits.ONE_POINT_FIVE: msg = "Cannot generate a Modbus Serial connection with 1.5 stopbits" logger.error(msg) raise ModbusConnectionError(msg)
[docs] class ModbusBaseCommunication(CommunicationProtocol, ABC): """ Implements the Communication Protocol for modbus. """ def __init__(self, configuration) -> None: """Constructor for modbus""" super().__init__(configuration) # create the modbus port specified in the configuration self.client: ModbusBaseSyncClient = self._generate_modbus_client() @abstractmethod def _generate_modbus_client(self) -> ModbusBaseSyncClient: """Generate the proper ModbusClient"""
[docs] def open(self) -> None: """ Open the Modbus connection. :raises ModbusConnectionFailedError: if the connection fails. """ # open the port logger.debug("Open Modbus Port.") with self.access_lock: if not self.client.connect(): raise ModbusConnectionError
[docs] def close(self) -> None: """ Close the Modbus connection. """ # close the port logger.debug("Close Modbus Port.") with self.access_lock: self.client.close()
[docs] def write_registers(self, address: int, values: list[int] | int) -> None: """ Write values from the specified address forward. :param address: address of the first register :param values: list with all values """ logger.debug(f"Write registers {address} with values {values}") if isinstance(values, int): values = [values] with self.access_lock: try: self.client.write_registers( address=address, values=values, device_id=self.config.unit ) except ConnectionException as e: logger.exception("Connection Error from Modbus", exc_info=e) raise ModbusConnectionError from e
[docs] def read_holding_registers(self, address: int, count: int) -> list[int]: """ Read specified number of register starting with given address and return the values from each register. :param address: address of the first register :param count: count of registers to read :return: list of `int` values """ logger.debug(f"Read holding registers {address} with count {count}.") with self.access_lock: try: response = self.client.read_holding_registers( address=address, count=count, device_id=self.config.unit ) except ConnectionException as e: logger.exception("Connection Error from Modbus", exc_info=e) raise ModbusConnectionError from e if response.isError(): msg = ( "ModbusResponse has error with function code: " f"{response.function_code}" ) logger.error(msg) raise ModbusConnectionError(msg) registers = response.registers logger.debug(f"Returned holding registers {address}: {registers}") return registers
[docs] def read_input_registers(self, address: int, count: int) -> list[int]: """ Read specified number of register starting with given address and return the values from each register in a list. :param address: address of the first register :param count: count of registers to read :return: list of `int` values """ logger.debug(f"Read input registers {address} with count {count}.") with self.access_lock: try: registers = self.client.read_input_registers( address=address, count=count, device_id=self.config.unit ).registers except ConnectionException as e: logger.exception("Connection Error from Modbus", exc_info=e) raise ModbusConnectionError from e logger.debug(f"Returned input registers {address}: {registers}") return registers
[docs] class ModbusTcpCommunication(ModbusBaseCommunication): """ Modbus with the TCP-Version """ config: ModbusTcpCommunicationConfig
[docs] @staticmethod def config_cls() -> type[ModbusTcpCommunicationConfig]: return ModbusTcpCommunicationConfig
def _generate_modbus_client(self) -> ModbusTcpClient: logger.debug( f"Create ModbusTcpClient with host: {self.config.host}, " f"Port: {self.config.port}, Unit: {self.config.unit}" ) return ModbusTcpClient(cast("str", self.config.host), port=self.config.port)
[docs] class ModbusSerialCommunication(ModbusBaseCommunication): """ Modbus with the Serial-Version, can be RTU or ASCII """ config: ModbusSerialCommunicationConfig
[docs] @staticmethod def config_cls() -> type[ModbusSerialCommunicationConfig]: return ModbusSerialCommunicationConfig
def _generate_modbus_client(self) -> ModbusSerialClient: logger.debug( f"Create ModbusSerialClient with " f"Port: {self.config.port}, Unit: {self.config.unit}" ) return ModbusSerialClient( port=cast("str", self.config.port), framer=cast("FramerType", self.config.framer), baudrate=self.config.baudrate, bytesize=cast("SerialCommunicationBytesize", self.config.bytesize).value, parity=cast("SerialCommunicationParity", self.config.parity).value, stopbits=cast("SerialCommunicationStopbits", self.config.stopbits).value, timeout=self.config.timeout, )