# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Classes for the BaseCube device.
"""
import logging
from collections import deque
from datetime import datetime, timedelta
from itertools import cycle, product
from time import sleep, time
from typing import Optional, Sequence, Union, cast
from asyncua.sync import SyncNode
from hvl_ccb.comm.opc import (
OpcUaCommunication,
OpcUaCommunicationConfig,
OpcUaSubHandler,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev import SingleCommDevice
from hvl_ccb.utils.poller import Poller
from hvl_ccb.utils.typing import Number
from hvl_ccb.utils.validation import validate_bool, validate_number
from . import alarms, constants, earthing_stick
from .constants import (
_CubeOpcEndpoint,
_Door,
_EarthingRod,
_MeasurementChannel,
_T13Socket,
)
from .errors import CubeRemoteControlError, CubeStatusChangeError, CubeStopError
from .support import _SupportPort
logger = logging.getLogger(__name__)
class _BaseCubeSubscriptionHandler(OpcUaSubHandler):
"""
OPC Subscription handler for datachange events and normal events specifically
implemented for the BaseCube devices.
"""
def __init__(self):
self.alarm_status = alarms._AlarmsOverview()
def datachange_notification(self, node: SyncNode, val, data):
"""
In addition to the standard operation (debug logging entry of the datachange),
alarms are logged at INFO level using the alarm text.
:param node: the node object that triggered the datachange event
:param val: the new value
:param data:
"""
super().datachange_notification(node, val, data)
# assume an alarm datachange
try:
alarm_number = alarms._Alarms(node.nodeid.Identifier).number
setattr(
self.alarm_status, f"alarm_{alarm_number}", alarms._AlarmStatus(val)
)
alarm_level = alarms._AlarmText.get_level(alarm_number)
if val:
alarm_text = alarms._AlarmText.get_coming_message(alarm_number)
else:
alarm_text = alarms._AlarmText.get_going_message(alarm_number)
alarm_log_postfix = ""
if logger.root.level <= logging.DEBUG:
alarm_log_postfix = f" (opc alarm {alarm_number})" # pragma: no cover
if alarm_level == logging.DEBUG:
logger.debug(f"{alarm_text}{alarm_log_postfix}")
elif alarm_level == logging.INFO:
logger.info(f"{alarm_text}{alarm_log_postfix}")
elif alarm_level >= logging.WARNING:
logger.warning(f"{alarm_text}{alarm_log_postfix}")
return
except ValueError:
# not any of Alarms node IDs
pass
id_ = node.nodeid.Identifier
# assume a status datachange
if id_ == str(constants._Safety.STATUS):
new_status = constants.SafetyStatus(val)
logger.info(f"Safety: {new_status.name}")
return
# assume an earthing stick status datachange
for i in earthing_stick._EarthingStick._STICKS:
if id_ in earthing_stick._EarthingStick("", i)._CMD_STATUS:
new_status = earthing_stick.SwitchStatus(val)
logger.info(f"Earthing Stick {i}: {new_status.name}")
return
[docs]
@configdataclass
class BaseCubeConfiguration:
"""
Configuration dataclass for the BaseCube devices.
"""
#: Namespace of the OPC variables, typically this is 3 (coming from Siemens)
namespace_index: int = 3
polling_delay_sec: Number = 5.0
polling_interval_sec: Number = 1.0
timeout_status_change: Number = 6
timeout_interval: Number = 0.1
noise_level_measurement_channel_1: Number = 100
noise_level_measurement_channel_2: Number = 100
noise_level_measurement_channel_3: Number = 100
noise_level_measurement_channel_4: Number = 100
[docs]
def clean_values(self):
if self.namespace_index < 0:
raise ValueError(
"Index of the OPC variables namespace needs to be a positive integer."
)
if self.polling_interval_sec <= 0:
raise ValueError("Polling interval needs to be positive.")
if self.polling_delay_sec < 0:
raise ValueError("Polling delay needs to be not negative.")
if self.timeout_interval <= 0:
raise ValueError("Timeout interval for status change needs to be positive.")
if self.timeout_status_change < 0:
raise ValueError("Timeout for status change needs to be not negative.")
validate_number(
"Noise Level Measurement Channel 1",
self.noise_level_measurement_channel_1,
logger=logger,
)
validate_number(
"Noise Level Measurement Channel 2",
self.noise_level_measurement_channel_2,
logger=logger,
)
validate_number(
"Noise Level Measurement Channel 3",
self.noise_level_measurement_channel_3,
logger=logger,
)
validate_number(
"Noise Level Measurement Channel 4",
self.noise_level_measurement_channel_4,
logger=logger,
)
[docs]
@configdataclass
class BaseCubeOpcUaCommunicationConfig(OpcUaCommunicationConfig):
"""
Communication protocol configuration for OPC UA, specifications for the BaseCube
devices.
"""
#: Subscription handler for data change events
sub_handler: OpcUaSubHandler = _BaseCubeSubscriptionHandler()
endpoint_name: _CubeOpcEndpoint = _CubeOpcEndpoint.BASE_CUBE # type: ignore
[docs]
class BaseCubeOpcUaCommunication(OpcUaCommunication):
"""
Communication protocol specification for BaseCube devices.
"""
[docs]
@staticmethod
def config_cls():
return BaseCubeOpcUaCommunicationConfig
[docs]
class BaseCube(SingleCommDevice):
"""
Base class for Cube variants.
"""
OPC_MIN_YEAR = 1990
OPC_MAX_YEAR = 2089
def __init__(self, com, dev_config=None):
"""
Constructor for BaseCube.
:param com: the communication protocol or its configuration
:param dev_config: the device configuration
"""
super().__init__(com, dev_config)
self._status_poller = Poller(
self._spoll_handler,
polling_delay_sec=self.config.polling_delay_sec,
polling_interval_sec=self.config.polling_interval_sec,
)
self._toggle = cycle([False, True])
self._message_len = len(constants.MessageBoard)
self._status_board = [""] * self._message_len
self._message_board = deque([""] * self._message_len, maxlen=self._message_len)
# create earthing sticks
self.earthing_stick_1 = earthing_stick._EarthingStick(self, 1)
self.earthing_stick_2 = earthing_stick._EarthingStick(self, 2)
self.earthing_stick_3 = earthing_stick._EarthingStick(self, 3)
self.earthing_stick_4 = earthing_stick._EarthingStick(self, 4)
self.earthing_stick_5 = earthing_stick._EarthingStick(self, 5)
self.earthing_stick_6 = earthing_stick._EarthingStick(self, 6)
# create support ports, each port has two inputs and outputs
self.support_1 = _SupportPort(self, 1)
self.support_2 = _SupportPort(self, 2)
self.support_3 = _SupportPort(self, 3)
self.support_4 = _SupportPort(self, 4)
self.support_5 = _SupportPort(self, 5)
self.support_6 = _SupportPort(self, 6)
# create measurement channels
self.measurement_ch_1 = _MeasurementChannel(
self, 1, input_noise=self.config.noise_level_measurement_channel_1
)
self.measurement_ch_2 = _MeasurementChannel(
self, 2, input_noise=self.config.noise_level_measurement_channel_2
)
self.measurement_ch_3 = _MeasurementChannel(
self, 3, input_noise=self.config.noise_level_measurement_channel_3
)
self.measurement_ch_4 = _MeasurementChannel(
self, 4, input_noise=self.config.noise_level_measurement_channel_4
)
[docs]
@staticmethod
def default_com_cls():
return BaseCubeOpcUaCommunication
[docs]
@staticmethod
def config_cls():
return BaseCubeConfiguration
[docs]
def start(self) -> None:
"""
Starts the device. Sets the root node for all OPC read and write commands to
the Siemens PLC object node which holds all our relevant objects and variables.
"""
logger.info("Starting Cube")
super().start()
logger.debug("Add monitoring nodes")
self.com.init_monitored_nodes(constants._CEE16, self.config.namespace_index)
# add T13 sockets
for socket in _T13Socket._SOCKETS:
self.com.init_monitored_nodes(
_T13Socket(socket)._CMD, self.config.namespace_index
)
for io, port, contact in product(
_SupportPort._IOS, _SupportPort._PORTS, _SupportPort._CONTACTS
):
self.com.init_monitored_nodes(
_SupportPort("", port)._cmd(io, contact), self.config.namespace_index
)
self.com.init_monitored_nodes(
map(str, constants._Safety), # type: ignore
self.config.namespace_index,
)
self.com.init_monitored_nodes(
str(constants._Errors.MESSAGE), self.config.namespace_index
)
self.com.init_monitored_nodes(
str(constants._Errors.WARNING), self.config.namespace_index
)
self.com.init_monitored_nodes(
str(constants._Errors.STOP), self.config.namespace_index
)
self.com.init_monitored_nodes(
map(str, alarms._Alarms), self.config.namespace_index
)
for i in earthing_stick._EarthingStick._STICKS:
self.com.init_monitored_nodes(
earthing_stick._EarthingStick("", i)._CMD_STATUS,
self.config.namespace_index,
)
self._set_remote_control(True)
logger.info("Finished starting")
self._set_current_time()
logger.debug("Sent system time to Cube.")
[docs]
def stop(self) -> None:
"""
Stop the Cube device. Deactivates the remote control and closes the
communication protocol.
:raises CubeStopError: when the cube is not in the correct status
to stop the operation
"""
status = self._status
if status not in constants.STOP_SAFETY_STATUSES:
msg = (
"Cube needs to be in status "
f"{' or '.join(s.name for s in constants.STOP_SAFETY_STATUSES)} "
f"to close the connection to the device, but is in status {status.name}"
)
logger.error(msg)
raise CubeStopError(msg)
try:
self._set_remote_control(False)
finally:
super().stop()
logger.info("Stopping Cube")
def _spoll_handler(self) -> None:
"""
Cube poller handler; change one byte on a Cube.
"""
self.write(constants._OpcControl.LIVE, next(self._toggle))
[docs]
def read(self, node_id: str):
"""
Local wrapper for the OPC UA communication protocol read method.
:param node_id: the id of the node to read.
:return: the value of the variable
"""
logger.debug(f"Read from node ID {node_id} ...")
result = self.com.read(str(node_id), self.config.namespace_index)
logger.debug(f"Read from node ID {node_id}: {result}")
return result
[docs]
def write(self, node_id, value) -> None:
"""
Local wrapper for the OPC UA communication protocol write method.
:param node_id: the id of the node to write
:param value: the value to write to the variable
"""
logger.debug(f"Write to node ID {node_id}: {value}")
self.com.write(str(node_id), self.config.namespace_index, value)
[docs]
@classmethod
def datetime_to_opc(cls, time_dt: datetime) -> list[int]:
"""
Converts python datetime format into opc format (list of 8 integers) as defined
in the following link:
https://support.industry.siemens.com/cs/mdm/109798671?c=133950752267&lc=de-WW
Each byte corresponds to one list entry.
[yy, MM, dd, hh, mm, ss, milliseconds, weekday]
Milliseconds and Weekday are not used, as this precision / information is not
needed.
The conversion of the numbers is special. Each decimal number is treated as it
would be a hex-number and then converted back to decimal.
This is tested with the used PLC in the BaseCube.
yy: 0 to 99 (0 -> 2000, 89 -> 2089, 90 -> 1990, 99 -> 1999)
MM: 1 to 12
dd: 1 to 31
hh: 0 to 23
mm: 0 to 59
ss: 0 to 59
:param time_dt: time to be converted
:return: time in opc list format
"""
validate_number("year", time_dt.year, (cls.OPC_MIN_YEAR, cls.OPC_MAX_YEAR), int)
time_tuple = (
int(str(time_dt.year)[2:]),
time_dt.month,
time_dt.day,
time_dt.hour,
time_dt.minute,
time_dt.second,
0,
0,
)
return [int(f"0x{time_comp}", base=0) for time_comp in time_tuple]
def _set_current_time(self) -> None:
"""
Send current UTC time of host computer to Cube. As the time is only
synchronized during the startup and after the polling started, the polling delay
is added to the current time
"""
time_opc = self.datetime_to_opc(
datetime.utcnow() + timedelta(seconds=self.config.polling_delay_sec)
)
self.write(constants._OpcControl.TIME, time_opc)
def _set_remote_control(self, state: bool) -> None:
"""
Enable or disable remote control for the Cube. This will effectively
display a message on the touchscreen HMI.
:param state: desired remote control state
:raises TypeError: when state is not of type bool
:raises CubeRemoteControlError: when the remote control
cannot be (de-)activated
"""
validate_bool("state", state, logger)
if not state and self._status not in constants.STOP_SAFETY_STATUSES:
status = self._status
msg = (
"Cube needs to be in status 'GREEN_NOT_READY' or 'GREEN_READY' "
f"to turn off the remote control, but is in '{status.name}'"
)
logger.error(msg)
raise CubeRemoteControlError(msg)
can_write = False
try:
self.write(constants._OpcControl.ACTIVE, state)
can_write = True
finally:
if state:
if not can_write:
msg = "Remote control cannot be enabled" # pragma: no cover
logger.error(msg) # pragma: no cover
raise CubeRemoteControlError(msg) # pragma: no cover
else:
was_not_polling = self._status_poller.start_polling()
if not was_not_polling:
msg = "Remote control already enabled" # pragma: no cover
logger.error(msg) # pragma: no cover
raise CubeRemoteControlError(msg) # pragma: no cover
else:
was_polling = self._status_poller.stop_polling()
self._status_poller.wait_for_polling_result()
if not was_polling:
msg = "Remote control already disabled" # pragma: no cover
logger.error(msg) # pragma: no cover
raise CubeRemoteControlError(msg) # pragma: no cover
# creates the T13 power sockets
t13_socket_1 = _T13Socket(socket=1)
t13_socket_2 = _T13Socket(socket=2)
t13_socket_3 = _T13Socket(socket=3)
@property
def cee16_socket(self):
"""
Read the on-state of the IEC CEE16 three-phase power socket.
:return: the on-state of the CEE16 power socket
"""
value = bool(self.read(constants._CEE16))
state = "ON" if value else "OFF"
logger.info(f"CEE 16 A Power Socket is {state}")
return value
@cee16_socket.setter
def cee16_socket(self, value):
"""
Switch the IEC CEE16 three-phase power socket on or off.
:param value: desired on-state of the power socket
:raises TypeError: if state is not of type bool
"""
validate_bool("state", value, logger)
self.write(constants._CEE16, value)
state_str = "ON" if value else "OFF"
logger.info(f"CEE 16 A Power Socket is switched {state_str}")
@property
def _status(self) -> constants.SafetyStatus:
"""
Get the safety circuit status of the Cube.
For internal use only, without logging
:return: the safety status of the Cube's state machine.
"""
return constants.SafetyStatus(self.read(constants._Safety.STATUS))
@property
def status(self) -> constants.SafetyStatus:
"""
Get the safety circuit status of the Cube.
This methods is for the user.
:return: the safety status of the Cube's state machine.
"""
value = self._status
logger.info(f"Safety Status is {value.name}")
return value
def _switch_safety_status(
self,
switch: constants._SafetyStatusTransition,
raise_state: bool = True,
):
"""
Internal method to switch the safety status of the cube.
Checks are performed if the switch can be executed.
:param switch: indicates the transition
:param raise_state: True will raise the state, False will lower the state
:raise CubeStatusChangeError: if the status change was not successful or
the status cannot be changed because the cube is in the wrong safety status
for the queried operation
"""
source = constants._SafetyStatusTransition(switch).source
target = constants._SafetyStatusTransition(switch).target
actual_status = self._status
if raise_state and actual_status is not source:
msg = (
f"Cube needs to be in status '{source.name}' in order to switch "
f"to '{target.name}', but is in '{actual_status.name}'"
)
logger.error(msg)
raise CubeStatusChangeError(msg)
elif not raise_state and actual_status is not target:
msg = (
f"Cube needs to be in status '{target.name}' in order to switch "
f"to '{source.name}', but is in '{actual_status.name}'"
)
logger.error(msg)
raise CubeStatusChangeError(msg)
status = target if raise_state else source
logger.info(f"Status of Cube will be changed to {status.name}") # type: ignore
self.write(switch.command, raise_state)
start_time = time()
while (
self._status is not status
and time() - start_time < self.config.timeout_status_change
):
sleep(self.config.timeout_interval)
actual_status = self._status
if actual_status is status:
logger.info(
f"Status successfully changed to {status.name}." # type: ignore
)
else:
msg = (
f"Tried to change status to {status.name}, " # type: ignore
f"but Cube status is {actual_status.name}"
)
logger.error(msg)
raise CubeStatusChangeError(msg)
@property
def ready(self) -> Optional[bool]:
"""
Indicates if 'ready' is activated. 'ready' means locket safety circuit,
red lamps, but high voltage still off.
:return: `True` if ready is activated (RED_READY),
`False` if ready is deactivated (GREEN_READY),
`None` otherwise
"""
status = self.status
if status == constants.SafetyStatus.RED_READY:
return True
elif status == constants.SafetyStatus.GREEN_READY:
return False
else:
return None
@ready.setter
def ready(self, state: bool) -> None:
"""
Set ready state. Ready means locket safety circuit, red lamps, but high voltage
still off.
:param state: set ready state
:raises CubeStatusChangeError: if `state=True` and cube is not in GREEN_READY or
if `state=False` and cube is not in RED_READY
"""
validate_bool("state", state, logger)
self._switch_safety_status(
switch=cast(
constants._SafetyStatusTransition,
constants._SafetyStatusTransition.SWITCH_TO_READY,
),
raise_state=state,
)
@property
def operate(self) -> Optional[bool]:
"""
Indicates if 'operate' is activated. 'operate' means locket safety circuit,
red lamps, high voltage on and locked safety switches.
:return: `True` if operate is activated (RED_OPERATE),
`False` if ready is deactivated (RED_READY),
`None` otherwise
"""
status = self.status
if status == constants.SafetyStatus.RED_OPERATE:
return True
elif status == constants.SafetyStatus.RED_READY:
return False
else:
return None
@operate.setter
def operate(self, state: bool) -> None:
"""
Set operate state. This will turn on the high
voltage and close the safety switches.
:param state: set operate state
:raises CubeStatusChangeError: if `state=True` and cube is not in RED_READY or
if `state=False` and cube is not in RED_OPERATE
"""
validate_bool("state", state, logger)
self._switch_safety_status(
switch=cast(
constants._SafetyStatusTransition,
constants._SafetyStatusTransition.SWITCH_TO_OPERATE,
),
raise_state=state,
)
@property
def breakdown_detection_active(self) -> bool:
"""
Get the state of the breakdown detection functionality. Returns True if it is
enabled, False otherwise.
:return: state of the breakdown detection functionality
"""
value = self.read(constants._BreakdownDetection.ACTIVATED)
status = "" if value else "NOT "
logger.info(f"Breakdown Detection Unit is {status}activated")
return value
@property
def breakdown_detection_triggered(self) -> bool:
"""
See if breakdown detection unit has been triggered. Returns True if it is
triggered, False otherwise.
:return: trigger status of the breakdown detection unit
"""
value = self.read(constants._BreakdownDetection.TRIGGERED)
status = "" if value else "NOT "
logger.info(f"Breakdown Detection Unit is {status}triggered")
return value
[docs]
def breakdown_detection_reset(self) -> None:
"""
Reset the breakdown detection circuitry so that it is ready
to detect breakdowns again.
"""
self.write(constants._BreakdownDetection.RESET, True)
sleep(0.1)
self.write(constants._BreakdownDetection.RESET, False)
logger.info("The Breakdown Detection Unit is reset")
[docs]
def quit_error(self) -> None:
"""
Quits errors that are active on the Cube.
"""
logger.info("Quit Errors of Cube")
self.write(constants._Errors.QUIT, True)
sleep(0.1)
self.write(constants._Errors.QUIT, False)
# create doors
door_1_status = _Door(1, "door status")
door_2_status = _Door(2, "door status")
door_3_status = _Door(3, "door status")
# create earthing rods
earthing_rod_1_status = _EarthingRod(1, "earthing rod status")
earthing_rod_2_status = _EarthingRod(2, "earthing rod status")
earthing_rod_3_status = _EarthingRod(3, "earthing rod status")
[docs]
def set_status_board(
self,
msgs: list[str],
pos: Optional[list[int]] = None,
clear_board: bool = True,
display_board: bool = True,
) -> None:
"""
Sets and displays a status board. The messages and the position of the message
can be defined.
:param msgs: list of strings
:param pos: list of integers [0...14]
:param clear_board: clear unspecified lines if `True` (default), keep otherwise
:param display_board: display new status board if `True` (default)
:raises ValueError: if there are too many messages or the positions indices are
invalid.
"""
# validate inputs
if len(msgs) > self._message_len:
raise ValueError(
f"Too many message: {len(msgs)} given, "
f"max. {self._message_len} allowed."
)
if pos and not all(0 < p < self._message_len for p in pos):
raise ValueError(f"Messages positions out of 0...{self._message_len} range")
if clear_board:
self._status_board = [""] * self._message_len
# update status board
if not pos:
pos = list(range(len(msgs)))
for num, msg in zip(pos, msgs):
self._status_board[num] = msg
if display_board:
self.display_status_board()
[docs]
def display_status_board(self) -> None:
"""
Display status board.
"""
self._display_messages(self._status_board)
logger.info("Cube HMI is now displaying the Status Board")
[docs]
def set_message_board(self, msgs: list[str], display_board: bool = True) -> None:
"""
Fills messages into message board that display that 15 newest messages with
a timestamp.
:param msgs: list of strings
:param display_board: display 15 newest messages if `True` (default)
:raises ValueError: if there are too many messages or the positions indices are
invalid.
"""
# validate inputs
if len(msgs) > self._message_len:
raise ValueError(
f"Too many message: {len(msgs)} given, "
f"max. {self._message_len} allowed."
)
timestamp = datetime.now().time().strftime("%H:%M:%S")
# append messages in the same order as given, not reversed
self._message_board.extendleft(f"{timestamp}: {msg}" for msg in reversed(msgs))
if display_board:
self.display_message_board()
[docs]
def display_message_board(self) -> None:
"""
Display 15 newest messages
"""
self._display_messages(self._message_board)
logger.info("Cube HMI is now displaying the Message Board")
def _display_messages(self, messages: Sequence[str]) -> None:
"""
Display given messages on message board
:param messages: sequence of messages to display
"""
# Note: cannot zip(constants.MessageBoard, messages) as enum instances are
# sorted by name, hence after after `line_1` comes `line_10`, not `line_2`
for n, msg in enumerate(messages):
line = constants.MessageBoard.line(n + 1)
self.write(line, msg)
[docs]
def active_alarms(self, human_readable: bool = True) -> list[Union[int, str]]:
"""
Displays all active alarms / messages.
:param human_readable: `True` for human readable message,
`False` for corresponding integer
:return: list with active alarms
"""
validate_bool("human_readable", human_readable, logger=logger)
active_alarms = []
for i in alarms._Alarms.range():
if getattr(self._com.config.sub_handler.alarm_status, f"alarm_{i}"):
if human_readable:
i = alarms._AlarmText.get_coming_message(i)
active_alarms.append(i)
return active_alarms