# Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for Pfeiffer TPG controllers.
The Pfeiffer TPG control units are used to control Pfeiffer Compact Gauges.
Models: TPG 251 A, TPG 252 A, TPG 256A, TPG 261, TPG 262, TPG 361, TPG 362 and TPG 366.
Manufacturer homepage:
https://www.pfeiffer-vacuum.com/en/products/measurement-analysis/
measurement/activeline/controllers/
"""
import logging
from enum import Enum, IntEnum
from typing import Union, cast
from hvl_ccb.comm.serial import (
SerialCommunication,
SerialCommunicationBytesize,
SerialCommunicationConfig,
SerialCommunicationParity,
SerialCommunicationStopbits,
)
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev.base import DeviceError, SingleCommDevice
from hvl_ccb.utils.enum import NameEnum
from hvl_ccb.utils.typing import Number
logger = logging.getLogger(__name__)
[docs]
class PfeifferTPGError(DeviceError):
"""
Error with the Pfeiffer TPG Controller.
"""
pass
[docs]
@configdataclass
class PfeifferTPGSerialCommunicationConfig(SerialCommunicationConfig):
#: Baudrate for Pfeiffer TPG controllers is 9600 baud
baudrate: int = 9600
#: Pfeiffer TPG controllers do not use parity
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: Pfeiffer TPG controllers use one stop bit
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: Union[int, SerialCommunicationBytesize] = (
SerialCommunicationBytesize.EIGHTBITS
)
#: The terminator is <CR><LF>
terminator: bytes = b"\r\n"
#: use 3 seconds timeout as default
timeout: Number = 3
[docs]
class PfeifferTPGSerialCommunication(SerialCommunication):
"""
Specific communication protocol implementation for Pfeiffer TPG controllers.
Already predefines device-specific protocol parameters in config.
"""
def __init__(self, configuration):
super().__init__(configuration)
[docs]
@staticmethod
def config_cls():
return PfeifferTPGSerialCommunicationConfig
[docs]
def send_command(self, cmd: str) -> None:
"""
Send a command to the device and check for acknowledgement.
:param cmd: command to send to the device
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if the answer from the device differs from the
expected acknowledgement character 'chr(6)'.
"""
with self.access_lock:
# send the command
self.write_text(cmd)
# check for acknowledgment char (ASCII 6)
answer = self.read_text()
if len(answer) == 0 or ord(answer[0]) != 6:
message = f"Pfeiffer TPG not acknowledging command {cmd}"
logger.error(message)
if len(answer) > 0:
logger.debug(f"Pfeiffer TPG: {answer}")
raise PfeifferTPGError(message)
[docs]
def query(self, cmd: str) -> str:
"""
Send a query, then read and returns the first line from the com port.
:param cmd: query message to send to the device
:return: first line read on the com
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if the device does not acknowledge the command or if
the answer from the device is empty
"""
with self.access_lock:
# send the command
self.write_text(cmd)
# check for acknowledgment char (ASCII 6)
answer = self.read_text()
if len(answer) == 0 or ord(answer[0]) != 6:
message = f"Pfeiffer TPG not acknowledging command {cmd}"
logger.error(message)
if len(answer) > 0:
logger.debug(f"Pfeiffer TPG: {answer}")
raise PfeifferTPGError(message)
# send enquiry
self.write_text(chr(5))
# read answer
answer = self.read_text().strip()
if len(answer) == 0:
message = f"Pfeiffer TPG not answering to command {cmd}"
logger.error(message)
raise PfeifferTPGError(message)
return answer
[docs]
@configdataclass
class PfeifferTPGConfig:
"""
Device configuration dataclass for Pfeiffer TPG controllers.
"""
[docs]
class Model(NameEnum, init="full_scale_ranges"): # type:ignore
TPG25xA = {
1: 0,
10: 1,
100: 2,
1000: 3,
2000: 4,
5000: 5,
10000: 6,
50000: 7,
0.1: 8,
}
TPGx6x = {
0.01: 0,
0.1: 1,
1: 2,
10: 3,
100: 4,
1000: 5,
2000: 6,
5000: 7,
10000: 8,
50000: 9,
}
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.full_scale_ranges_reversed: dict[int, int] = {
v: k for k, v in self.full_scale_ranges.items()
}
[docs]
def is_valid_scale_range_reversed_str(self, v: str) -> bool:
"""
Check if given string represents a valid reversed scale range of a model.
:param v: Reversed scale range string.
:return: `True` if valid, `False` otherwise.
"""
# Explicit check because otherwise we get `True` for instance for `float`
if not isinstance(v, str):
raise TypeError(f"Expected `str`, got `{type(v)}` instead.")
try:
return int(v) in self.full_scale_ranges_reversed
except ValueError as e:
logger.error(str(e), exc_info=e)
return False
# model of the TPG (determines which lookup table to use for the
# full scale range)
model: Union[str, Model] = Model.TPG25xA # type: ignore
[docs]
def clean_values(self):
if not isinstance(self.model, self.Model):
self.force_value("model", self.Model(self.model))
[docs]
class PfeifferTPG(SingleCommDevice):
"""
Pfeiffer TPG control unit device class
"""
SensorTypes = Enum( # type: ignore
value="SensorTypes",
names=[
("TPR/PCR Pirani Gauge", 1),
("TPR", 1),
("TPR/PCR", 1),
("IKR Cold Cathode Gauge", 2),
("IKR", 2),
("IKR9", 2),
("IKR11", 2),
("PKR Full range CC", 3),
("PKR", 3),
("APR/CMR Linear Gauge", 4),
("CMR", 4),
("APR/CMR", 4),
("CMR/APR", 4),
("Pirani / High Pressure Gauge", 5),
("IMR", 5),
("Fullrange BA Gauge", 6),
("PBR", 6),
("None", 7),
("no Sensor", 7),
("noSen", 7),
("noSENSOR", 7),
],
)
[docs]
class SensorStatus(IntEnum):
Ok = 0
Underrange = 1
Overrange = 2
Sensor_error = 3
Sensor_off = 4
No_sensor = 5
Identification_error = 6
def __init__(self, com, dev_config=None) -> None:
# Call superclass constructor
super().__init__(com, dev_config)
# list of sensors connected to the TPG
self.sensors: list[str] = []
def __repr__(self):
return f"Pfeiffer TPG with {self.number_of_sensors} sensors: {self.sensors}"
@property
def number_of_sensors(self):
return len(self.sensors)
@property
def unit(self):
"""
The pressure unit of readings is always mbar, regardless of the display unit.
"""
return "mbar"
[docs]
@staticmethod
def default_com_cls():
return PfeifferTPGSerialCommunication
[docs]
@staticmethod
def config_cls():
return PfeifferTPGConfig
[docs]
def start(self) -> None:
"""
Start this device. Opens the communication protocol,
and identify the sensors.
:raises SerialCommunicationIOError: when communication port cannot be opened
"""
logger.info("Starting Pfeiffer TPG")
super().start()
# identify the sensors connected to the TPG
# and also find out the number of channels
self.identify_sensors()
[docs]
def stop(self) -> None:
"""
Stop the device. Closes also the communication protocol.
"""
logger.info(f"Stopping device {self}")
super().stop()
[docs]
def identify_sensors(self) -> None:
"""
Send identification request TID to sensors on all channels.
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("TID")
except PfeifferTPGError as e:
logger.error("Pressure sensor identification failed.", exc_info=e)
raise
# try matching the sensors:
sensors = []
for s in answer.split(","):
try:
sensors.append(self.SensorTypes[s].name)
except KeyError as e:
logger.error(str(e), exc_info=e)
sensors.append("Unknown")
self.sensors = sensors
# identification successful:
logger.info(f"Identified {self}")
[docs]
def measure(self, channel: int) -> tuple[str, float]:
"""
Get the status and measurement of one sensor
:param channel: int channel on which the sensor is connected, with
1 <= channel <= number_of_sensors
:return: measured value as float if measurement successful,
sensor status as string if not
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
if not 1 <= channel <= self.number_of_sensors:
message = (
f"{channel} is not a valid channel number, it should be between "
f"1 and {self.number_of_sensors}"
)
logger.error(message)
raise ValueError(message)
try:
answer = self.com.query(f"PR{channel}")
except PfeifferTPGError as e:
logger.error(f"Reading sensor {channel} failed.", exc_info=e)
raise
status, measurement = answer.split(",")
s = self.SensorStatus(int(status))
if s == self.SensorStatus.Ok:
logger.info(
f"Channel {channel} successful reading of pressure: {measurement} mbar."
)
else:
logger.info(
f"Channel {channel} no reading of pressure, sensor status is "
f"{self.SensorStatus(s).name}."
)
return s.name, float(measurement)
[docs]
def measure_all(self) -> list[tuple[str, float]]:
"""
Get the status and measurement of all sensors (this command is
not available on all models)
:return: list of measured values as float if measurements successful,
and or sensor status as strings if not
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("PRX")
except PfeifferTPGError as e:
logger.error(
"Getting pressure reading from all sensors failed "
"(this command is not available on all TGP models).",
exc_info=e,
)
raise
ans = answer.split(",")
ret = [
(self.SensorStatus(int(ans[2 * i])).name, float(ans[2 * i + 1]))
for i in range(self.number_of_sensors)
]
logger.info(f"Reading all sensors with result: {ret}.")
return ret
def _set_full_scale(self, fsr: list[Number], unitless: bool) -> None:
"""
Set the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:param fsr: list of full scale range values, like `[0, 1, 3, 3, 2, 0]` for
`unitless = True` scale or `[0.01, 1000]` otherwise (mbar units scale)
:param unitless: flag to indicate scale of range values; if `False` then mbar
units scale
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
if len(fsr) != self.number_of_sensors:
raise ValueError(
f"Argument fsr should be of length {self.number_of_sensors}. "
f"Received length {len(fsr)}."
)
possible_values_map = (
self.config.model.full_scale_ranges_reversed
if unitless
else self.config.model.full_scale_ranges
)
wrong_values = [v for v in fsr if v not in possible_values_map]
if wrong_values:
raise ValueError(
f"Argument fsr contains invalid values: {wrong_values}. Accepted "
f"values are {list(possible_values_map.items())}"
f"{'' if unitless else ' mbar'}."
)
str_fsr = ",".join(
[str(f if unitless else possible_values_map[f]) for f in fsr]
)
try:
self.com.send_command(f"FSR,{str_fsr}")
logger.info(f"Set sensors full scale to {fsr} (unitless) respectively.")
except PfeifferTPGError as e:
logger.error("Setting sensors full scale failed.", exc_info=e)
raise e
def _get_full_scale(self, unitless: bool) -> list[Number]:
"""
Get the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:param unitless: flag to indicate scale of range values; if `False` then mbar
units scale
:return: list of full scale range values, like `[0, 1, 3, 3, 2, 0]` for
`unitless = True` scale or `[0.01, 1000]` otherwise (mbar units scale)
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("FSR")
except PfeifferTPGError as e:
logger.error("Query full scale range of all sensors failed.", exc_info=e)
raise
answer_values = answer.split(",")
wrong_values = [
v
for v in answer_values
if not self.config.model.is_valid_scale_range_reversed_str(v)
]
if wrong_values:
raise PfeifferTPGError(
"The controller returned the full unitless scale range values: "
f"{answer}. The values {wrong_values} are invalid. Accepted values are "
f"{list(self.config.model.full_scale_ranges_reversed.keys())}."
)
fsr = [
int(v) if unitless else self.config.model.full_scale_ranges_reversed[int(v)]
for v in answer_values
]
logger.info(
f"Obtained full scale range of all sensors as {fsr}"
f"{'' if unitless else ' mbar'}."
)
return fsr
[docs]
def set_full_scale_unitless(self, fsr: list[int]) -> None:
"""
Set the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:param fsr: list of full scale range values, like `[0, 1, 3, 3, 2, 0]`
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
self._set_full_scale(cast(list[Number], fsr), True)
[docs]
def get_full_scale_unitless(self) -> list[int]:
"""
Get the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:return: list of full scale range values, like `[0, 1, 3, 3, 2, 0]`
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
return cast(list[int], self._get_full_scale(True))
[docs]
def set_full_scale_mbar(self, fsr: list[Number]) -> None:
"""
Set the full scale range of the attached sensors (in unit mbar)
:param fsr: full scale range values in mbar, for example `[0.01, 1000]`
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
self._set_full_scale(fsr, False)
[docs]
def get_full_scale_mbar(self) -> list[Number]:
"""
Get the full scale range of the attached sensors
:return: full scale range values in mbar, like `[0.01, 1, 0.1, 1000, 50000, 10]`
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
return self._get_full_scale(False)