# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for Modbus communications. Makes use of the
`pymodbus <https://pymodbus.readthedocs.io/en/latest/>`_ library.
"""
import logging
from abc import ABC, abstractmethod
from ipaddress import IPv4Address, IPv6Address
from typing import cast
from pymodbus.client import ModbusBaseSyncClient, ModbusSerialClient, ModbusTcpClient
from pymodbus.exceptions import ConnectionException
from pymodbus.framer import FramerType
from hvl_ccb.comm import CommunicationError, CommunicationProtocol
from hvl_ccb.comm.serial import (
SerialCommunicationBytesize,
SerialCommunicationConfig,
SerialCommunicationParity,
SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.validation import (
validate_and_resolve_host,
validate_number,
validate_tcp_port,
)
logger = logging.getLogger(__name__)
[docs]
class ModbusConnectionError(ConnectionException, CommunicationError):
"""
Error raised when the connection failed.
"""
[docs]
@configdataclass
class ModbusBaseCommunicationConfig:
"""Base modbus communication config"""
# Unit number to be used when connecting with Modbus. Typically this is used
# when connecting to multiple Modbus devices.
unit: int = 1
[docs]
def clean_values(self) -> None:
validate_number("unit", self.unit, (0, 255), int, logger)
[docs]
@configdataclass
class ModbusTcpCommunicationConfig(ModbusBaseCommunicationConfig):
"""
Configuration dataclass for :class:`ModbusTcpCommunication`.
"""
# Host is the IP address or hostname of the connected device.
host: str | IPv4Address | IPv6Address | None = None
# TCP port
port: int = 502
[docs]
def clean_values(self) -> None:
super().clean_values()
self.force_value("host", validate_and_resolve_host(self.host, logger)) # type: ignore[attr-defined]
validate_tcp_port(self.port, logger)
[docs]
@configdataclass
class ModbusSerialCommunicationConfig(
ModbusBaseCommunicationConfig, SerialCommunicationConfig
):
"""
Configuration dataclass for :class:`ModbusRtuCommunication`.
"""
# Framer for the ModbusSerial communication
framer: str | FramerType = FramerType.RTU
[docs]
def clean_values(self) -> None:
SerialCommunicationConfig.clean_values(self)
ModbusBaseCommunicationConfig.clean_values(self)
if not isinstance(self.framer, FramerType):
self.force_value("framer", FramerType(self.framer)) # type: ignore[attr-defined]
if self.framer not in [FramerType.RTU, FramerType.ASCII]:
msg = "Framer must be 'RTU' or 'ASCII'"
logger.error(msg)
raise ModbusConnectionError(msg)
if not self.port:
msg = "Cannot generate a Modbus Serial connection without a COM-Port"
logger.error(msg)
raise ModbusConnectionError(msg)
if self.stopbits == SerialCommunicationStopbits.ONE_POINT_FIVE:
msg = "Cannot generate a Modbus Serial connection with 1.5 stopbits"
logger.error(msg)
raise ModbusConnectionError(msg)
[docs]
class ModbusBaseCommunication(CommunicationProtocol, ABC):
"""
Implements the Communication Protocol for modbus.
"""
def __init__(self, configuration) -> None:
"""Constructor for modbus"""
super().__init__(configuration)
# create the modbus port specified in the configuration
self.client: ModbusBaseSyncClient = self._generate_modbus_client()
@abstractmethod
def _generate_modbus_client(self) -> ModbusBaseSyncClient:
"""Generate the proper ModbusClient"""
[docs]
def open(self) -> None:
"""
Open the Modbus connection.
:raises ModbusConnectionFailedError: if the connection fails.
"""
# open the port
logger.debug("Open Modbus Port.")
with self.access_lock:
if not self.client.connect():
raise ModbusConnectionError
[docs]
def close(self) -> None:
"""
Close the Modbus connection.
"""
# close the port
logger.debug("Close Modbus Port.")
with self.access_lock:
self.client.close()
[docs]
def write_registers(self, address: int, values: list[int] | int) -> None:
"""
Write values from the specified address forward.
:param address: address of the first register
:param values: list with all values
"""
logger.debug(f"Write registers {address} with values {values}")
if isinstance(values, int):
values = [values]
with self.access_lock:
try:
self.client.write_registers(
address=address, values=values, device_id=self.config.unit
)
except ConnectionException as e:
logger.exception("Connection Error from Modbus", exc_info=e)
raise ModbusConnectionError from e
[docs]
def read_holding_registers(self, address: int, count: int) -> list[int]:
"""
Read specified number of register starting with given address and return
the values from each register.
:param address: address of the first register
:param count: count of registers to read
:return: list of `int` values
"""
logger.debug(f"Read holding registers {address} with count {count}.")
with self.access_lock:
try:
response = self.client.read_holding_registers(
address=address, count=count, device_id=self.config.unit
)
except ConnectionException as e:
logger.exception("Connection Error from Modbus", exc_info=e)
raise ModbusConnectionError from e
if response.isError():
msg = (
"ModbusResponse has error with function code: "
f"{response.function_code}"
)
logger.error(msg)
raise ModbusConnectionError(msg)
registers = response.registers
logger.debug(f"Returned holding registers {address}: {registers}")
return registers
[docs]
class ModbusTcpCommunication(ModbusBaseCommunication):
"""
Modbus with the TCP-Version
"""
config: ModbusTcpCommunicationConfig
[docs]
@staticmethod
def config_cls() -> type[ModbusTcpCommunicationConfig]:
return ModbusTcpCommunicationConfig
def _generate_modbus_client(self) -> ModbusTcpClient:
logger.debug(
f"Create ModbusTcpClient with host: {self.config.host}, "
f"Port: {self.config.port}, Unit: {self.config.unit}"
)
return ModbusTcpClient(cast("str", self.config.host), port=self.config.port)
[docs]
class ModbusSerialCommunication(ModbusBaseCommunication):
"""
Modbus with the Serial-Version, can be RTU or ASCII
"""
config: ModbusSerialCommunicationConfig
[docs]
@staticmethod
def config_cls() -> type[ModbusSerialCommunicationConfig]:
return ModbusSerialCommunicationConfig
def _generate_modbus_client(self) -> ModbusSerialClient:
logger.debug(
f"Create ModbusSerialClient with "
f"Port: {self.config.port}, Unit: {self.config.unit}"
)
return ModbusSerialClient(
port=cast("str", self.config.port),
framer=cast("FramerType", self.config.framer),
baudrate=self.config.baudrate,
bytesize=cast("SerialCommunicationBytesize", self.config.bytesize).value,
parity=cast("SerialCommunicationParity", self.config.parity).value,
stopbits=cast("SerialCommunicationStopbits", self.config.stopbits).value,
timeout=self.config.timeout,
)