# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for controlling a Lauda PRO RP245E, circulation chiller over TCP.
"""
import logging
import time
from enum import IntEnum
from typing import Union, cast
from hvl_ccb.comm.tcp import Tcp, TcpCommunicationConfig
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev.base import DeviceError, SingleCommDevice
from hvl_ccb.utils.enum import ValueEnum
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class LaudaProRp245eCommand(ValueEnum):
"""
Commands for Lauda PRO RP245E Chiller
Command strings most often need to be complimented with a parameter
(attached as a string) before being sent to the device.
Commands implemented as defined in "Lauda Betriebsanleitung fuer
PRO Badthermostate und Umwaelzthermostate" pages 42 - 49
"""
#: Pass on external controlled temperature
EXTERNAL_TEMP = "OUT_PV_05_"
#: Define temperature set point
TEMP_SET_POINT = "OUT_SP_00_"
#: Define pump level 1-8
PUMP_LEVEL = "OUT_SP_01_"
#: Define operation mode
OPERATION_MODE = "OUT_SP_02_"
#: Define upper temp limit
UPPER_TEMP = "OUT_SP_04_"
#: Define lower temp limit
LOWER_TEMP = "OUT_SP_05_"
#: Define communication time out
COM_TIME_OUT = "OUT_SP_08_"
#: Set control mode 1=internal, 2=ext. analog, 3=ext. serial, 4=USB, 5=ethernet
CONT_MODE = "OUT_MODE_01_"
#: Start temp control (pump and heating/cooling)
START = "START"
#: Stop temp control (pump and heating/cooling)
STOP = "STOP"
#: Select a ramp program (target for all further ramp commands)
RAMP_SELECT = "RMP_SELECT_"
#: Start a selected ramp program
RAMP_START = "RMP_START"
#: Pause a selected ramp program
RAMP_PAUSE = "RMP_PAUSE"
#: Continue a paused ramp program
RAMP_CONTINUE = "RMP_CONT"
#: Stop a running ramp program
RAMP_STOP = "RMP_STOP"
#: Reset a selected ramp program
RAMP_DELETE = "RMP_RESET"
#: Define how often a ramp program should be iterated
RAMP_ITERATIONS = "RMP_OUT_02_"
#: Define parameters of a selected ramp program
RAMP_SET = "RMP_OUT_00_"
#: Request internal bath temperature
BATH_TEMP = "IN_PV_00"
#: Request device type
DEVICE_TYPE = "TYPE"
[docs]
def build_str(self, param: str = "", terminator: str = "\r\n"):
"""
Build a command string for sending to the device
:param param: Command's parameter given as string
:param terminator: Command's terminator
:return: Command's string with a parameter and terminator
"""
return f"{self.value}{param}{terminator}"
[docs]
class LaudaProRp245eCommandError(DeviceError):
"""
Error raised when an error is returned upon a command.
"""
pass
[docs]
@configdataclass
class LaudaProRp245eTcpCommunicationConfig(TcpCommunicationConfig):
"""
Configuration dataclass for :class:`LaudaProRp245eTcpCommunication`.
"""
#: Delay time between commands in seconds
wait_sec_pre_read_or_write: Number = 0.005
#: The terminator character
terminator: str = "\r\n"
[docs]
def clean_values(self) -> None:
# host, raises ValueError on its own if not suitable
super().clean_values()
if self.wait_sec_pre_read_or_write < 0:
raise ValueError("communication waiting time has to be >= 0")
lsterm = ["", "\n", "\r", "\r\n"]
if self.terminator not in lsterm:
raise ValueError("Unknown terminator.")
[docs]
class LaudaProRp245eTcpCommunication(Tcp):
"""
Implements the Communication Protocol for Lauda PRO RP245E TCP connection.
"""
def __init__(self, configuration):
"""Constructor for socket"""
super().__init__(configuration)
[docs]
@staticmethod
def config_cls() -> type[LaudaProRp245eTcpCommunicationConfig]:
return LaudaProRp245eTcpCommunicationConfig
[docs]
def write_command(self, command: LaudaProRp245eCommand, param: str = "") -> None:
"""
Send command function.
:param command: first part of command string, defined in `LaudaProRp245eCommand`
:param param: second part of command string, parameter (by default '')
:return: None
"""
try:
LaudaProRp245eCommand(command)
except ValueError:
err_msg = f"Unknown command: '{command}///{param}'"
logger.error(err_msg)
raise LaudaProRp245eCommandError(err_msg)
time.sleep(LaudaProRp245eTcpCommunicationConfig.wait_sec_pre_read_or_write)
with self.access_lock:
self.write(
command.build_str(
param=param,
terminator=self.config.terminator,
)
)
[docs]
def read(self) -> str:
"""
Receive value function.
:return: reply from device as a string, the terminator, as well as the 'OK'
stripped from the reply to make it directly useful as a value (e.g. in case the
internal bath temperature is requested)
"""
time.sleep(self.config.wait_sec_pre_read_or_write)
with self.access_lock:
reply = super().read()
return reply.replace(self.config.terminator, "").replace("OK", "")
[docs]
def query_command(self, command: LaudaProRp245eCommand, param: str = "") -> str:
"""
Send and receive function. E.g. to be used when setting/changing device setting.
:param command: first part of command string, defined in `LaudaProRp245eCommand`
:param param: second part of command string, parameter (by default '')
:return: None
"""
with self.access_lock:
self.write_command(command, param)
reply = self.read()
if "ERR" in reply:
err_msg = f"Error in reply to a command: '{command}///{param}' => '{reply}'"
logger.error(err_msg)
raise LaudaProRp245eCommandError(err_msg)
return reply
[docs]
def open(self) -> None:
"""
Open the Lauda PRO RP245E TCP connection.
:raises LaudaProRp245eCommandError: if the connection fails.
"""
# open the port
with self.access_lock:
super().open()
device_type = self.query_command(
cast(LaudaProRp245eCommand, LaudaProRp245eCommand.DEVICE_TYPE)
)
if "PRO" not in device_type:
err_msg = "Could not connect to Lauda RP 245 E PRO. Check IP address."
logger.error(err_msg)
raise LaudaProRp245eCommandError(err_msg)
[docs]
def close(self) -> None:
"""
Close the Lauda PRO RP245E TCP connection.
"""
with self.access_lock:
# Set Lauda control mode to internal
self.write_command(
cast(LaudaProRp245eCommand, LaudaProRp245eCommand.CONT_MODE),
str(LaudaProRp245eConfig.ExtControlModeEnum.INTERNAL.value),
)
# Stop currently running processes
self.write_command(cast(LaudaProRp245eCommand, LaudaProRp245eCommand.STOP))
# Close connection to Lauda
super().close()
[docs]
@configdataclass
class LaudaProRp245eConfig:
"""
Configuration for the Lauda RP245E circulation chiller.
"""
[docs]
class OperationModeEnum(IntEnum):
"""Operation Mode (Cooling OFF/Cooling On/AUTO - set to AUTO)"""
COOLOFF = 0
COOLON = 1
#: Automatically select heating/cooling
AUTO = 2
[docs]
class ExtControlModeEnum(IntEnum):
"""
Source for definition of external, controlled temperature (option 2, 3 and 4
are not available with current configuration of the Lauda RP245E,
add-on hardware would required)
"""
INTERNAL = 0
EXPT100 = 1
ANALOG = 2
SERIAL = 3
USB = 4
ETH = 5
#: Default temperature set point
temp_set_point_init: Number = 20.0
#: Default pump Level
pump_init: int = 6
#: Upper temperature limit (safe for Galden HT135 cooling liquid)
upper_temp: Number = 80.0
#: Lower temperature limit (safe for Galden HT135 cooling liquid)
lower_temp: Number = -55.0
#: Communication time out (0 = OFF)
com_time_out: Number = 0
#: Highest pump level of the chiller
max_pump_level: int = 8
#: Maximum number of ramp programs that can be stored in the memory of the chiller
max_pr_number: int = 5
operation_mode: Union[int, OperationModeEnum] = OperationModeEnum.AUTO
control_mode: Union[int, ExtControlModeEnum] = ExtControlModeEnum.INTERNAL
[docs]
def clean_values(self) -> None:
if not isinstance(self.operation_mode, self.OperationModeEnum):
self.force_value( # type: ignore
"operation_mode", self.OperationModeEnum(self.operation_mode)
)
if not isinstance(self.control_mode, self.ExtControlModeEnum):
self.force_value( # type: ignore
"control_mode", self.ExtControlModeEnum(self.control_mode)
)
[docs]
class LaudaProRp245e(SingleCommDevice):
"""
Lauda RP245E circulation chiller class.
"""
def __init__(self, com, dev_config=None) -> None:
"""
Constructor for Lauda.
:param com: object to use as communication protocol.
"""
# Call superclass constructor
super().__init__(com, dev_config)
[docs]
@staticmethod
def default_com_cls() -> type[LaudaProRp245eTcpCommunication]:
return LaudaProRp245eTcpCommunication
[docs]
@staticmethod
def config_cls() -> type[LaudaProRp245eConfig]:
return LaudaProRp245eConfig
[docs]
def validate_pump_level(self, level: int):
"""
Validates pump level. Raises ValueError, if pump level is incorrect.
:param level: pump level, integer
"""
if level > self.config.max_pump_level:
err_msg = f"maximum pump level is {self.config.max_pump_level}"
raise ValueError(err_msg)
[docs]
def start(self) -> None:
"""
Start this device.
"""
logger.info("Starting device " + str(self))
try:
# try opening the port
super().start()
except LaudaProRp245eCommandError as exc:
logger.error(str(exc), exc_info=exc)
raise
# Defaults of things to be changed at runtime
# safe temperature set point
self.set_temp_set_point(self.config.temp_set_point_init)
# standard pump level
self.set_pump_level(self.config.pump_init)
# Defaults set only here, not required at runtime
self.com.query_command(
LaudaProRp245eCommand.OPERATION_MODE, str(self.config.operation_mode.value)
)
self.com.query_command(
LaudaProRp245eCommand.UPPER_TEMP, str(self.config.upper_temp)
)
self.com.query_command(
LaudaProRp245eCommand.LOWER_TEMP, str(self.config.lower_temp)
)
self.com.query_command(
LaudaProRp245eCommand.COM_TIME_OUT, str(self.config.com_time_out)
)
[docs]
def stop(self) -> None:
"""
Stop this device. Disables access and
closes the communication protocol.
"""
logger.info("Stopping device " + str(self))
super().stop()
[docs]
def pause(self) -> str:
"""
Stop temperature control and pump.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.STOP)
[docs]
def run(self) -> str:
"""
Start temperature control & pump.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.START)
[docs]
def set_external_temp(self, external_temp: float = 20.00) -> str:
"""
Pass value of external controlled temperature. Should be done every second,
when control of external temperature is active. Has to be done right before
control of external temperature is activated.
:param external_temp: current value of external temperature to be controlled.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(
LaudaProRp245eCommand.EXTERNAL_TEMP, f"{external_temp:.2f}"
)
[docs]
def set_temp_set_point(self, temp_set_point: float = 20.00) -> str:
"""
Define temperature set point
:param temp_set_point: temperature set point.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(
LaudaProRp245eCommand.TEMP_SET_POINT, f"{temp_set_point:.2f}"
)
[docs]
def set_pump_level(self, pump_level: int = 6) -> str:
"""
Set pump level
Raises ValueError, if pump level is invalid.
:param pump_level: pump level.
:return: reply of the device to the last call of "query"
"""
self.validate_pump_level(pump_level)
return self.com.query_command(LaudaProRp245eCommand.PUMP_LEVEL, str(pump_level))
[docs]
def set_control_mode(
self,
mod: Union[
int, LaudaProRp245eConfig.ExtControlModeEnum
] = LaudaProRp245eConfig.ExtControlModeEnum.INTERNAL,
) -> str:
"""
Define control mode. 0 = INTERNAL (control bath temp), 1 = EXPT100 (pt100
attached to chiller), 2 = ANALOG, 3 = SERIAL, 4 = USB, 5 = ETH (to be used
when passing the ext. temp. via ethernet) (temperature then needs to be
passed every second, when not using options 3, 4, or 5)
:param mod: temp control mode (control internal temp or external temp).
:return: reply of the device to the last call of "query"
("OK", if command was recognized")
"""
mod_enum = LaudaProRp245eConfig.ExtControlModeEnum(mod)
return self.com.query_command(
LaudaProRp245eCommand.CONT_MODE, str(mod_enum.value)
)
[docs]
def set_ramp_program(self, program: int = 1) -> str:
"""
Define ramp program for following ramp commands.
Raises ValueError if maximum number of ramp programs (5) is exceeded.
:param program: Number of ramp program to be activated for following commands.
:return: reply of the device to the last call of "query"
"""
if program > self.config.max_pr_number:
err_msg = f"Maximum number of ramp programs is {self.config.max_pr_number}."
raise ValueError(err_msg)
return self.com.query_command(LaudaProRp245eCommand.RAMP_SELECT, str(program))
[docs]
def start_ramp(self) -> str:
"""
Start current ramp program.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_START)
[docs]
def pause_ramp(self) -> str:
"""
Pause current ramp program.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_PAUSE)
[docs]
def continue_ramp(self) -> str:
"""
Continue current ramp program.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_CONTINUE)
[docs]
def stop_ramp(self) -> str:
"""
Stop current ramp program.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_STOP)
[docs]
def reset_ramp(self) -> str:
"""
Delete all segments from current ramp program.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_DELETE)
[docs]
def set_ramp_iterations(self, num: int = 1) -> str:
"""
Define number of ramp program cycles.
:param num: number of program cycles to be performed.
:return: reply of the device to the last call of "query"
"""
return self.com.query_command(LaudaProRp245eCommand.RAMP_ITERATIONS, str(num))
[docs]
def set_ramp_segment(
self, temp: float = 20.00, dur: int = 0, tol: float = 0.00, pump: int = 6
) -> str:
"""
Define segment of current ramp program - will be attached to current program.
Raises ValueError, if pump level is invalid.
:param temp: target temperature of current ramp segment
:param dur: duration in minutes, in which target temperature should be reached
:param tol: tolerance at which target temperature should be reached (for 0.00,
next segment is started after dur has passed).
:param pump: pump level to be used for this program segment.
:return: reply of the device to the last call of "query"
"""
self.validate_pump_level(pump)
segment = f"{temp:.2f}_{dur}_{tol:.2f}_{pump}"
return self.com.query_command(LaudaProRp245eCommand.RAMP_SET, segment)
[docs]
def get_bath_temp(self) -> float:
"""
:return : float value of measured lauda bath temp in °C
"""
rep = self.com.query_command(LaudaProRp245eCommand.BATH_TEMP)
return float(rep)
[docs]
def get_device_type(self) -> str:
"""
:return : Connected Lauda device type (for connection/com test)
"""
return self.com.query_command(LaudaProRp245eCommand.DEVICE_TYPE)