# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for a SST Luminox Oxygen sensor. This device can measure the oxygen
concentration between 0 % and 25 %.
Furthermore, it measures the barometric pressure and internal temperature.
The device supports two operating modes: in streaming mode the device measures all
parameters every second, in polling mode the device measures only after a query.
Technical specification and documentation for the device can be found a the
manufacturer's page:
https://www.sstsensing.com/product/luminox-optical-oxygen-sensors-2/
"""
import logging
import re
from enum import Enum
from time import sleep
from typing import Optional, Union, cast
from hvl_ccb.comm.serial import (
SerialCommunication,
SerialCommunicationBytesize,
SerialCommunicationConfig,
SerialCommunicationParity,
SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev.base import DeviceError, SingleCommDevice
from hvl_ccb.utils.enum import ValueEnum
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class LuminoxError(DeviceError):
"""
General Error for Luminox Device.
"""
pass
[docs]
class LuminoxOutputModeError(LuminoxError):
"""
Wrong output mode for requested data
"""
pass
[docs]
class LuminoxOutputMode(Enum):
"""
output mode.
"""
streaming = 0
polling = 1
[docs]
class LuminoxMeasurementTypeError(LuminoxError):
"""
Wrong measurement type for requested data
"""
pass
LuminoxMeasurementTypeValue = Union[float, int, str]
"""A typing hint for all possible LuminoxMeasurementType values as read in either
streaming mode or in a polling mode with `LuminoxMeasurementType.all_measurements`.
Beware: has to be manually kept in sync with `LuminoxMeasurementType` instances
`cast_type` attribute values.
"""
LuminoxMeasurementTypeDict = dict[
Union[str, "LuminoxMeasurementType"], LuminoxMeasurementTypeValue
]
"""A typing hint for a dictionary holding LuminoxMeasurementType values. Keys are
allowed as strings because `LuminoxMeasurementType` is of a `StrEnumBase` type.
"""
[docs]
class LuminoxMeasurementType(ValueEnum, init="value cast_type value_re"): # type:ignore
"""
Measurement types for `LuminoxOutputMode.polling`.
The `all_measurements` type will read values for the actual measurement types
as given in `LuminoxOutputMode.all_measurements_types()`; it parses multiple
single values using regexp's for other measurement types, therefore, no regexp is
defined for this measurement type.
"""
partial_pressure_o2 = "O", float, r"[0-9]{4}.[0-9]"
percent_o2 = "%", float, r"[0-9]{3}.[0-9]{2}"
temperature_sensor = "T", float, r"[+-][0-9]{2}.[0-9]"
barometric_pressure = "P", int, r"[0-9]{4}"
sensor_status = "e", int, r"[0-9]{4}"
date_of_manufacture = "# 0", str, r"[0-9]{5} [0-9]{5}"
serial_number = "# 1", str, r"[0-9]{5} [0-9]{5}"
software_revision = "# 2", str, r"[0-9]{5}"
all_measurements = "A", str, None
[docs]
@classmethod
def all_measurements_types(cls) -> tuple["LuminoxMeasurementType", ...]:
"""
A tuple of `LuminoxMeasurementType` enum instances which are actual
measurements, i.e. not date of manufacture or software revision.
"""
return cast(
tuple["LuminoxMeasurementType", ...],
(
cls.partial_pressure_o2,
cls.temperature_sensor,
cls.barometric_pressure,
cls.percent_o2,
cls.sensor_status,
),
)
@property
def command(self) -> str:
return self.value.split(" ")[0]
[docs]
def parse_read_measurement_value(
self, read_txt: str
) -> Union[LuminoxMeasurementTypeDict, LuminoxMeasurementTypeValue]:
if self is LuminoxMeasurementType.all_measurements:
return {
measurement: measurement._parse_single_measurement_value(read_txt)
for measurement in LuminoxMeasurementType.all_measurements_types()
}
return self._parse_single_measurement_value(read_txt)
def _parse_single_measurement_value(
self, read_txt: str
) -> LuminoxMeasurementTypeValue:
parsed_data: list[str] = re.findall(f"{self.command} {self.value_re}", read_txt)
if len(parsed_data) != 1:
self._parse_error(parsed_data)
parsed_measurement: str = parsed_data[0]
try:
parsed_value = self.cast_type(
# don't check for empty match - we know already that there is one
re.search(self.value_re, parsed_measurement).group() # type: ignore
)
except ValueError as e:
logger.error(str(e), exc_info=e)
self._parse_error(parsed_data)
return parsed_value
def _parse_error(self, parsed_data: list[str]) -> None:
err_msg = (
f"Expected measurement value for {self.name.replace('_', ' ')} of type "
f'{self.cast_type}; instead tyring to parse: "{parsed_data}"'
)
logger.error(err_msg)
raise LuminoxMeasurementTypeError(err_msg)
[docs]
@configdataclass
class LuminoxSerialCommunicationConfig(SerialCommunicationConfig):
#: Baudrate for SST Luminox is 9600 baud
baudrate: int = 9600
#: SST Luminox does not use parity
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: SST Luminox does use one stop bit
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: Union[int, SerialCommunicationBytesize] = (
SerialCommunicationBytesize.EIGHTBITS
)
#: The terminator is CR LF
terminator: bytes = b"\r\n"
#: use 3 seconds timeout as default
timeout: Number = 3
[docs]
class LuminoxSerialCommunication(SerialCommunication):
"""
Specific communication protocol implementation for the SST Luminox oxygen sensor.
Already predefines device-specific protocol parameters in config.
"""
[docs]
@staticmethod
def config_cls():
return LuminoxSerialCommunicationConfig
[docs]
@configdataclass
class LuminoxConfig:
"""
Configuration for the SST Luminox oxygen sensor.
"""
# wait between set and validation of output mode
wait_sec_post_activate: Number = 0.5
wait_sec_trials_activate: Number = 0.1
nr_trials_activate: int = 5
[docs]
def clean_values(self):
if self.wait_sec_post_activate <= 0:
raise ValueError(
"Wait time (sec) post output mode activation must be a positive number."
)
if self.wait_sec_trials_activate <= 0:
raise ValueError(
"Re-try wait time (sec) for mode activation must be a positive number."
)
if self.nr_trials_activate <= 0:
raise ValueError(
"Trials for mode activation must be a positive integer >=1)."
)
[docs]
class Luminox(SingleCommDevice):
"""
Luminox oxygen sensor device class.
"""
def __init__(self, com, dev_config=None) -> None:
# Call superclass constructor
super().__init__(com, dev_config)
self.output: Optional[LuminoxOutputMode] = None
[docs]
@staticmethod
def config_cls():
return LuminoxConfig
[docs]
@staticmethod
def default_com_cls():
return LuminoxSerialCommunication
[docs]
def start(self) -> None:
"""
Start this device. Opens the communication protocol.
"""
logger.info(f"Starting device {self}")
super().start()
[docs]
def stop(self) -> None:
"""
Stop the device. Closes also the communication protocol.
"""
logger.info(f"Stopping device {self}")
super().stop()
def _write(self, value: str) -> None:
"""
Write given `value` string to `self.com`.
:param value: String value to send.
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text(value)
def _read(self) -> str:
"""
Read a string value from `self.com`.
:return: Read text from the serial port, without the trailing terminator,
as defined in the communcation protocol configuration.
:raises SerialCommunicationIOError: when communication port is not opened
"""
return self.com.read_text().rstrip(self.com.config.terminator_str())
[docs]
def activate_output(self, mode: LuminoxOutputMode) -> None:
"""
activate the selected output mode of the Luminox Sensor.
:param mode: polling or streaming
"""
with self.com.access_lock:
self._write(f"M {mode.value}")
# needs a little bit of time ot activate
sleep(self.config.wait_sec_post_activate)
for trial in range(self.config.nr_trials_activate + 1):
msg = self._read()
if (
not msg == f"M 0{mode.value}"
and trial == self.config.nr_trials_activate
):
err_msg = (
"Stream mode activation was not possible "
f"after {self.config.nr_trials_activate} trials {self}"
)
logger.error(err_msg)
raise LuminoxOutputModeError(err_msg)
if msg == f"M 0{mode.value}":
msg = (
"Stream mode activation possible "
f"in trial {trial} out of {self.config.nr_trials_activate}"
)
logger.info(msg)
break
sleep(self.config.wait_sec_trials_activate)
self.output = mode
logger.info(f"{mode.name} mode activated {self}")
[docs]
def read_streaming(self) -> LuminoxMeasurementTypeDict:
"""
Read values of Luminox in the streaming mode. Convert the single string
into separate values.
:return: dictionary with `LuminoxMeasurementType.all_measurements_types()` keys
and accordingly type-parsed values.
:raises LuminoxOutputModeError: when streaming mode is not activated
:raises LuminoxMeasurementTypeError: when any of expected measurement values is
not read
"""
if not self.output == LuminoxOutputMode.streaming:
err_msg = f"Streaming mode not activated {self}"
logger.error(err_msg)
raise LuminoxOutputModeError(err_msg)
read_txt = self._read()
return cast(
LuminoxMeasurementTypeDict,
cast(
LuminoxMeasurementType, LuminoxMeasurementType.all_measurements
).parse_read_measurement_value(read_txt),
)
[docs]
def query_polling(
self,
measurement: Union[str, LuminoxMeasurementType],
) -> Union[LuminoxMeasurementTypeDict, LuminoxMeasurementTypeValue]:
"""
Query a value or values of Luminox measurements in the polling mode,
according to a given measurement type.
:param measurement: type of measurement
:return: value of requested measurement
:raises ValueError: when a wrong key for LuminoxMeasurementType is provided
:raises LuminoxOutputModeError: when polling mode is not activated
:raises LuminoxMeasurementTypeError: when expected measurement value is not read
"""
if not isinstance(measurement, LuminoxMeasurementType):
try:
measurement = cast(
LuminoxMeasurementType,
LuminoxMeasurementType[measurement], # type: ignore
)
except KeyError as e:
logger.error(str(e), exc_info=e)
measurement = cast(
LuminoxMeasurementType,
LuminoxMeasurementType(measurement),
)
if not self.output == LuminoxOutputMode.polling:
err_msg = f"Polling mode not activated {self}"
logger.error(err_msg)
raise LuminoxOutputModeError(err_msg)
with self.com.access_lock:
self._write(str(measurement))
read_txt = self._read()
read_value = measurement.parse_read_measurement_value(read_txt)
return read_value