Source code for hvl_ccb.dev.mbw973.mbw973

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for controlling a MBW 973 SF6 Analyzer over a serial connection.

The MBW 973 is a gas analyzer designed for gas insulated switchgear and measures
humidity, SF6 purity and SO2 contamination in one go.
Manufacturer homepage: https://www.mbw.ch/products/sf6-gas-analysis/973-sf6-analyzer/
"""

import logging
from typing import Union

from hvl_ccb.comm.serial import (
    SerialCommunication,
    SerialCommunicationBytesize,
    SerialCommunicationConfig,
    SerialCommunicationParity,
    SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev.base import DeviceError, SingleCommDevice
from hvl_ccb.utils.poller import Poller
from hvl_ccb.utils.typing import Number

logger = logging.getLogger(__name__)


[docs] class MBW973Error(DeviceError): """ General error with the MBW973 dew point mirror device. """ pass
[docs] class MBW973ControlRunningError(MBW973Error): """ Error indicating there is still a measurement running, and a new one cannot be started. """ pass
[docs] class MBW973PumpRunningError(MBW973Error): """ Error indicating the pump of the dew point mirror is still recovering gas, unable to start a new measurement. """ pass
[docs] @configdataclass class MBW973SerialCommunicationConfig(SerialCommunicationConfig): #: Baudrate for MBW973 is 9600 baud baudrate: int = 9600 #: MBW973 does not use parity parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE #: MBW973 does use one stop bit stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE #: One byte is eight bits long bytesize: Union[int, SerialCommunicationBytesize] = ( SerialCommunicationBytesize.EIGHTBITS ) #: The terminator is only CR terminator: bytes = b"\r" #: use 3 seconds timeout as default timeout: Number = 3
[docs] class MBW973SerialCommunication(SerialCommunication): """ Specific communication protocol implementation for the MBW973 dew point mirror. Already predefines device-specific protocol parameters in config. """
[docs] @staticmethod def config_cls(): return MBW973SerialCommunicationConfig
[docs] @configdataclass class MBW973Config: """ Device configuration dataclass for MBW973. """ #: Polling period for `is_done` status queries [in seconds]. polling_interval: Number = 2
[docs] def clean_values(self): if self.polling_interval <= 0: raise ValueError("Polling interval needs to be positive.")
[docs] class MBW973(SingleCommDevice): """ MBW 973 dew point mirror device class. """ def __init__(self, com, dev_config=None): # Call superclass constructor super().__init__(com, dev_config) # polling status self.status_poller = Poller( self.is_done, polling_delay_sec=self.config.polling_interval, polling_interval_sec=self.config.polling_interval, ) # is done with dew point = True, new measurement sample required and # not ready yet = False self.is_done_with_measurements = True # dict telling what measurement options are selected self.measurement_options = { "dewpoint": True, "SF6_Vol": False, } self.last_measurement_values = {}
[docs] @staticmethod def default_com_cls(): return MBW973SerialCommunication
[docs] @staticmethod def config_cls(): return MBW973Config
[docs] def start(self) -> None: """ Start this device. Opens the communication protocol and retrieves the set measurement options from the device. :raises SerialCommunicationIOError: when communication port cannot be opened. """ logger.info("Starting device " + str(self)) super().start() # check test options self.write("HumidityTest?") self.measurement_options["dewpoint"] = bool(self.read_int()) self.write("SF6PurityTest?") self.measurement_options["SF6_Vol"] = bool(self.read_int())
[docs] def stop(self) -> None: """ Stop the device. Closes also the communication protocol. """ logger.info("Stopping device " + str(self)) super().stop()
[docs] def write(self, value) -> None: """ Send `value` to `self.com`. :param value: Value to send, converted to `str`. :raises SerialCommunicationIOError: when communication port is not opened """ self.com.write_text(str(value))
[docs] def read(self, cast_type: type = str): """ Read value from `self.com` and cast to `cast_type`. Raises `ValueError` if read text (`str`) is not convertible to `cast_type`, e.g. to `float` or to `int`. :return: Read value of `cast_type` type. """ return cast_type(self.com.read_text())
[docs] def read_float(self) -> float: """ Convenience wrapper for `self.read()`, with typing hint for return value. :return: Read `float` value. """ return self.read(float)
[docs] def read_int(self) -> int: """ Convenience wrapper for `self.read()`, with typing hint for return value. :return: Read `int` value. """ return self.read(int)
[docs] def is_done(self) -> bool: """ Poll status of the dew point mirror and return True, if all measurements are done. :return: True, if all measurements are done; False otherwise. :raises SerialCommunicationIOError: when communication port is not opened """ # assume everything is done done = True if self.measurement_options["dewpoint"]: # ask if done with DP self.write("DoneWithDP?") done = done and bool(self.read_int()) if self.measurement_options["SF6_Vol"]: # ask if done with SF6 volume measurement self.write("SF6VolHold?") done = done and bool(self.read_int()) self.is_done_with_measurements = done if self.is_done_with_measurements: self.status_poller.stop_polling() self.read_measurements() return self.is_done_with_measurements
[docs] def start_control(self) -> None: """ Start dew point control to acquire a new value set. :raises SerialCommunicationIOError: when communication port is not opened """ # send control? self.write("control?") if self.read_float(): raise MBW973ControlRunningError # send Pump.on? to check, whether gas is still being pumped back self.write("Pump.on?") if self.read_float(): raise MBW973PumpRunningError # start control of device self.write("control=1") logger.info("Starting dew point control") self.is_done_with_measurements = False self.status_poller.start_polling()
[docs] def read_measurements(self) -> dict[str, float]: """ Read out measurement values and return them as a dictionary. :return: Dictionary with values. :raises SerialCommunicationIOError: when communication port is not opened """ self.write("Fp?") frostpoint = self.read_float() self.write("Fp1?") frostpoint_ambient = self.read_float() self.write("Px?") pressure = self.read_float() self.write("PPMv?") ppmv = self.read_float() self.write("PPMw?") ppmw = self.read_float() self.write("SF6Vol?") sf6_vol = self.read_float() values = { "frostpoint": frostpoint, "frostpoint_ambient": frostpoint_ambient, "pressure": pressure, "ppmv": ppmv, "ppmw": ppmw, "sf6_vol": sf6_vol, } logger.info("Read out values") logger.info(values) self.last_measurement_values = values return values
[docs] def set_measuring_options( self, humidity: bool = True, sf6_purity: bool = False ) -> None: """ Send measuring options to the dew point mirror. :param humidity: Perform humidity test or not? :param sf6_purity: Perform SF6 purity test or not? :raises SerialCommunicationIOError: when communication port is not opened """ self.write(f"HumidityTest={1 if humidity else 0}") self.write(f"SF6PurityTest={1 if sf6_purity else 0}") self.measurement_options["dewpoint"] = humidity self.measurement_options["SF6_Vol"] = sf6_purity