# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Module with base classes for devices.
"""
import logging
from abc import ABC, abstractmethod
from typing import Union
from hvl_ccb.comm import CommunicationProtocol
from hvl_ccb.configuration import ConfigurationMixin, configdataclass
from hvl_ccb.error import CCBError
logger = logging.getLogger(__name__)
[docs]
class DeviceError(CCBError):
pass
[docs]
class DeviceExistingError(DeviceError):
"""
Error to indicate that a device with that name already exists.
"""
pass
[docs]
class DeviceFailuresError(DeviceError):
"""
Error to indicate that one or several devices failed.
"""
def __init__(self, failures: dict[str, Exception], *args):
super().__init__(failures)
self.failures: dict[str, Exception] = failures
"""A dictionary of named devices failures (exceptions).
"""
[docs]
@configdataclass
class EmptyConfig:
"""
Empty configuration dataclass that is the default configuration for a Device.
"""
pass
[docs]
class Device(ConfigurationMixin, ABC):
"""
Base class for devices. Implement this class for a concrete device,
such as measurement equipment or voltage sources.
Specifies the methods to implement for a device.
"""
def __init__(self, dev_config=None):
"""
Constructor for Device.
"""
super().__init__(dev_config)
[docs]
@abstractmethod
def start(self) -> None:
"""
Start or restart this Device. To be implemented in the subclass.
"""
[docs]
@abstractmethod
def stop(self) -> None:
"""
Stop this Device. To be implemented in the subclass.
"""
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
[docs]
@staticmethod
def config_cls():
return EmptyConfig
[docs]
class DeviceSequenceMixin(ABC):
"""
Mixin that can be used on a device or other classes to provide facilities for
handling multiple devices in a sequence.
"""
def __init__(self, devices: dict[str, Device]):
"""
Constructor for the DeviceSequenceMixin.
:param devices: is a dictionary of devices to be added to this sequence.
"""
super().__init__()
self._devices: dict[str, Device] = {}
for name, device in devices.items():
self.add_device(name, device)
self.devices_failed_start: dict[str, Device] = dict()
"""Dictionary of named device instances from the sequence for which the most
recent `start()` attempt failed.
Empty if `stop()` was called last; cf. `devices_failed_stop`."""
self.devices_failed_stop: dict[str, Device] = dict()
"""Dictionary of named device instances from the sequence for which the most
recent `stop()` attempt failed.
Empty if `start()` was called last; cf. `devices_failed_start`."""
def __getattribute__(self, item: str) -> Union[Device, object]:
"""
Gets Device from the sequence or object's attribute.
:param item: Item name to get
:return: Device or object's attribute.
"""
if item != "_devices" and item in self._devices:
return self.get_device(item)
return super().__getattribute__(item)
def __eq__(self, other):
return (
isinstance(other, DeviceSequenceMixin) and self._devices == other._devices
)
[docs]
def get_devices(self) -> list[tuple[str, Device]]:
"""
Get list of name, device pairs according to current sequence.
:return: A list of tuples with name and device each.
"""
return list(self._devices.items())
[docs]
def get_device(self, name: str) -> Device:
"""
Get a device by name.
:param name: is the name of the device.
:return: the device object from this sequence.
"""
return self._devices.get(name) # type: ignore
[docs]
def add_device(self, name: str, device: Device) -> None:
"""
Add a new device to the device sequence.
:param name: is the name of the device.
:param device: is the instantiated Device object.
:raise DeviceExistingError:
"""
if name in self._devices:
raise DeviceExistingError
# disallow over-shadowing via ".DEVICE_NAME" lookup
if hasattr(self, name):
raise ValueError(
"This sequence already has an attribute called"
f" {name}. Use different name for this device."
)
self._devices[name] = device
[docs]
def remove_device(self, name: str) -> Device:
"""
Remove a device from this sequence and return the device object.
:param name: is the name of the device.
:return: device object or `None` if such device was not in the sequence.
:raises ValueError: when device with given name was not found
"""
if name not in self._devices:
raise ValueError(f'No device named "{name}" in this sequence.')
if name in self.devices_failed_start:
self.devices_failed_start.pop(name)
elif name in self.devices_failed_stop:
self.devices_failed_stop.pop(name)
return self._devices.pop(name)
[docs]
def start(self) -> None:
"""
Start all devices in this sequence in their added order.
:raises DeviceFailuresError: if one or several devices failed to start
"""
# reset the failure dicts
failures = dict()
self.devices_failed_start = dict()
self.devices_failed_stop = dict()
for name, device in self._devices.items():
try:
device.start()
except Exception as e:
logger.error(f"Could not start {name}: ", exc_info=e)
failures[name] = e
self.devices_failed_start[name] = device
if failures:
raise DeviceFailuresError(failures)
[docs]
def stop(self) -> None:
"""
Stop all devices in this sequence in their reverse order.
:raises DeviceFailuresError: if one or several devices failed to stop
"""
# reset the failure dicts
failures: dict[str, Exception] = dict()
self.devices_failed_start = dict()
self.devices_failed_stop = dict()
for name, device in self._devices.items():
try:
device.stop()
except Exception as e:
logger.error(f"Could not start {name}: ", exc_info=e)
failures[name] = e
self.devices_failed_stop[name] = device
if failures:
raise DeviceFailuresError(failures)
[docs]
class SingleCommDevice(Device, ABC):
"""
Base class for devices with a single communication protocol.
"""
# Omitting typing hint `com: CommunicationProtocol` on purpose
# to enable PyCharm autocompletion for subtypes.
def __init__(self, com, dev_config=None) -> None:
"""
Constructor for Device. Links the communication protocol and provides a
configuration for the device.
:param com: Communication protocol to be used with
this device. Can be of type: - CommunicationProtocol instance, - dictionary
with keys and values to be used as configuration together with the
default communication protocol, or - @configdataclass to be used together
with the default communication protocol.
:param dev_config: configuration of the device. Can be:
- None: empty configuration is used, or the specified config_cls()
- @configdataclass decorated class
- Dictionary, which is then used to instantiate the specified config_cls()
"""
super().__init__(dev_config)
if isinstance(com, CommunicationProtocol):
self._com = com
else:
self._com = self.default_com_cls()(com)
[docs]
@staticmethod
@abstractmethod
def default_com_cls() -> type[CommunicationProtocol]:
"""
Get the class for the default communication protocol used with this device.
:return: the type of the standard communication protocol for this device
"""
@property
def com(self):
"""
Get the communication protocol of this device.
:return: an instance of CommunicationProtocol subtype
"""
return self._com
[docs]
def start(self) -> None:
"""
Open the associated communication protocol.
"""
self.com.open()
[docs]
def stop(self) -> None:
"""
Close the associated communication protocol.
"""
self.com.close()