Source code for hvl_ccb.dev.fluke884x.base

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Python module for the Fluke8845a Multimeter.
The communication to the device is through Telnet.
8845A/8846A Programmers Manual is available in the following link.
All page numbers mentioned in this script refer to this manual.
https://download.flukecal.com/pub/literature/8845A___pmeng0300.pdf
"""

import logging
from typing import Optional, Union, cast

from hvl_ccb.comm import SyncCommunicationProtocol
from hvl_ccb.comm.telnet import TelnetCommunication, TelnetCommunicationConfig
from hvl_ccb.configuration import configdataclass
from hvl_ccb.dev import SingleCommDevice
from hvl_ccb.utils.validation import validate_number

from ...utils.typing import Number
from .constants import (
    Fluke8845aCheckError,
    Fluke8845aError,
    Fluke8845aUnknownCommandError,
    MeasurementFunction,
    TriggerSource,
    _ApertureDescriptor,
    _FilterDescriptor,
    _RangeDescriptor,
)

logger = logging.getLogger(__name__)


[docs] @configdataclass class Fluke8845aTelnetCommunicationConfig(TelnetCommunicationConfig): #: Port at which Fluke 8845a is listening port: int = 3490 #: The terminator is CR terminator: bytes = b"\r"
[docs] class Fluke8845aTelnetCommunication(TelnetCommunication, SyncCommunicationProtocol):
[docs] @staticmethod def config_cls(): return Fluke8845aTelnetCommunicationConfig
[docs] def query( self, command: str, n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Number] = None, ) -> str: """ Send a command to the interface and handle the status message. Eventually raises an error. :param command: Command to send :param n_attempts_max: Amount of attempts how often a non-empty text is tried to be read as answer :param attempt_interval_sec: time between the reading attempts :raises Fluke8845aError: if the connection is broken :return: Answer from the interface """ with self.access_lock: logger.debug(f"Fluke8845aCommunication, send: '{command}'") answer: Optional[str] = super().query( command, n_attempts_max=n_attempts_max, attempt_interval_sec=attempt_interval_sec, ) # string or None logger.debug(f"Fluke8845aCommunication, receive: '{answer}'") if answer is None: msg = ( f"Fluke8845aCommunication did get no answer on command: '{command}'" ) logger.error(msg) raise Fluke8845aError(msg) return answer
[docs] @configdataclass class Fluke8845aConfig: """ Config for Fluke8845a name: the name of the device """ name: str = "Fluke 1"
[docs] class Fluke8845a(SingleCommDevice): """ Device class to control Fluke8845a """ DISPLAY_MAX_LENGTH = 12 def __init__(self, com, dev_config=None) -> None: # Call superclass constructor super().__init__(com, dev_config) logger.debug(f"{self} {self.config.name} initialised.") def __str__(self) -> str: return "Fluke8845a"
[docs] @staticmethod def default_com_cls() -> type[Fluke8845aTelnetCommunication]: return Fluke8845aTelnetCommunication
[docs] @staticmethod def config_cls() -> type[Fluke8845aConfig]: return Fluke8845aConfig
[docs] def start(self) -> None: """ Start this device as recommended by the manual """ logger.info(f"Starting device: {self} {self.config.name}") # try opening the port super().start() self.activate_remote_mode() self.reset() self.clear_error_queue() logger.info(f"Device {self} {self.config.name} started successfully")
[docs] def stop(self) -> None: """ Stop this device. Disables access and closes the communication protocol. """ logger.info(f"Stopping device: {self} {self.config.name}") super().stop() logger.info(f"Device {self} {self.config.name} stopped successfully")
[docs] def activate_remote_mode(self) -> None: """ Page 66 Places the Meter in the remote mode for RS-232 or Ethernet remote control. All front-panel keys, except the local key, are disabled. """ logger.debug("Enable remote mode") self.com.write("SYST:REM")
[docs] def reset(self) -> None: """ Page 60 resets the meter to its power-up configuration """ logger.debug("Resets the meter to power-up configuration") self.com.write("*RST")
[docs] def clear_error_queue(self) -> None: """ Page 62 Sets all bits to zero in the Meter’s status byte register and all event registers. Also clears the error queue """ logger.debug("Clear the error queue") self.com.write("*CLS")
@property def identification(self) -> str: """ Page 60 Queries `"*IDN?"` and returns the identification string of the connected device. :return: the identification string of the connected device e.g. `"FLUKE, 8845A, 2540017, 08/02/10-11:53"` """ value = self.com.query("*IDN?") logger.info(f"The identification string of the Fluke 8845a: {value}") return value
[docs] def initiate_trigger(self) -> None: """ Set trigger system to wait-for-trigger """ logger.debug("Set trigger system to wait-for-trigger") self.com.write("INIT")
[docs] def trigger(self) -> None: """ Causes the meter to trigger a measurement when paused """ logger.debug("Causes the meter to trigger a measurement when paused") self.com.write("*TRG")
[docs] def fetch(self) -> float: """ Page 36 Transfer stored readings to output buffer """ logger.debug("Transfer stored readings to output buffer") return float(self.com.query("FETC?"))
[docs] def measure(self) -> float: """ Page 42 Taking measurement Once the Meter has been configured for a measurement, the INITiate command causes the Meter to take a measurement when the trigger condition have been met. To process readings from the Meter's internal memory to the output buffer, send the Meter a FETCh? command. """ self.initiate_trigger() self.trigger() measurement_unit = self.measurement_function._range().unit() measure_value = self.fetch() logger.info(f"measured value {measure_value} {measurement_unit}") return measure_value
@property def measurement_function(self) -> MeasurementFunction: """ input_function getter, query what the input function is :raises Fluke8845aUnknownCommandError: if the input function is unknown """ # When query "FUNC?", return example '"CURR"' # use strip to remove the quotation mark measurement_function = self.com.query("FUNC?").strip('"') try: return MeasurementFunction(measurement_function) except ValueError: msg = ( f"Function '{measurement_function}' " "not yet implemented or not a valid function." ) logger.error(msg) raise Fluke8845aUnknownCommandError(msg) @measurement_function.setter def measurement_function(self, input_function: Union[str, MeasurementFunction]): """ input_funtion setter, set the input function :param input_function: string or MeasurementFunction Enum, for example: "CURR", "PER", or MeasurementFunction.CURRENT_AC... :raises Fluke8845aUnknownCommandError: if the input function is unknown :raises Fluke8845aCheckError: if setting failed """ try: input_function = MeasurementFunction(input_function) # type:ignore except ValueError: msg = ( "Fluke function not yet implemented; " f"Possible functions are {list(MeasurementFunction)}" # type:ignore ) logger.error(msg) raise Fluke8845aUnknownCommandError(msg) self.com.write(f"CONF:{input_function}") function = self.measurement_function if function == input_function: logger.info(f"Input function is successfully set to '{input_function}'") else: msg = ( "Input function setting failed: " f"should be '{input_function}' but is now '{function}'" ) logger.error(msg) raise Fluke8845aCheckError(msg) @property def trigger_source(self) -> TriggerSource: """ input_trigger_source getter, query what the input trigger source is :raise Fluke8845aUnknownCommandError: if the input trigger source is unknown """ input_trigger_source_checked = self.com.query("TRIG:SOUR?") try: return TriggerSource(input_trigger_source_checked) except ValueError: msg = f"Fluke trigger source not valid: '{input_trigger_source_checked}'" logger.error(msg) raise Fluke8845aUnknownCommandError(msg) @trigger_source.setter def trigger_source(self, input_trigger_source: Union[str, TriggerSource]): """ Page 57 input_trigger_source setter, set the input trigger source :param input_trigger_source: string or TriggerSource Enum, :raises Fluke8845aUnknownCommandError: if the input trigger source is unknown :raises Fluke8845aCheckError: if setting failed """ try: input_trigger_source = TriggerSource(input_trigger_source) # type:ignore except ValueError: msg = ( "Unknown trigger source; " f"Possible functions are {list(TriggerSource)}" # type:ignore ) logger.error(msg) raise Fluke8845aUnknownCommandError(msg) self.com.write(f"TRIG:SOUR {input_trigger_source}") trigger_source = self.trigger_source if trigger_source == input_trigger_source: logger.info( f"input trigger source is successfully set to '{input_trigger_source}'" ) else: msg = ( "input trigger source setting failed: " f"should be '{input_trigger_source}' but is now '{trigger_source}'" ) logger.error(msg) raise Fluke8845aCheckError(msg) @property def trigger_delay(self) -> int: """ input_trigger_delay getter, query what the input trigger delay is in second answer format from Fluke: string, '+1.00000000E+00', so convert to float and then to int :return: input trigger delay in second """ trigger_delay = int(float(self.com.query("TRIG:DEL?"))) logging.info(f"Trigger delay is set to {trigger_delay}s") return trigger_delay @trigger_delay.setter def trigger_delay(self, input_trigger_delay: int): """ Page 57 input_trigger_delay setter, sets the delay between receiving a trigger and the beginning of measurement cycle input_trigger_delay should be between 0 and 3600 seconds :param input_trigger_delay: int, input trigger delay in second :raises Fluke8845aCheckError: if setting failed """ validate_number( "input trigger delay", input_trigger_delay, (0, 3600), int, logger=logger ) self.com.write(f"TRIG:DEL {input_trigger_delay}") trigger_delay = self.trigger_delay if trigger_delay == input_trigger_delay: logger.info( f"input trigger delay is successfully set to {input_trigger_delay}s" ) else: msg = ( "input trigger delay setting failed: " f"should be {input_trigger_delay}s but is now {trigger_delay}s" ) logger.error(msg) raise Fluke8845aCheckError(msg) @property def display_enable(self) -> bool: """ Page 59 get if the display is enabled or not fluke answer string "1" for ON and "0" for off bool(int("1")) = 1 and bool(int("0")) = 0 :return: bool enabled = True, else False """ display_enable = bool(int(self.com.query("DISP?"))) status = "ON" if display_enable else "OFF" logging.info(f"Display is {status}") return display_enable @display_enable.setter def display_enable(self, display_enable: bool): """ Page 59 Enables or disables the Meter's display. :param display_enable: bool, enable display or not :raises Fluke8845aCheckError: if setting failed """ status = "ON" if display_enable else "OFF" self.com.write(f"DISP {status}") if bool(int(self.com.query("DISP?"))) == display_enable: logger.info(f"Display is successfully switched {status}") else: msg = f"Display could not be switched {status}" logger.error(msg) raise Fluke8845aCheckError(msg)
[docs] def clear_display_message(self) -> None: """ Page 59 Clears the displayed message on the Meter's display. """ self.com.write("DISP:TEXT:CLE") logger.info("Clear message from display")
@property def display_message(self) -> str: """ Page 59 Retrieves the text sent to the Meter's display. """ display_message = self.com.query("DISP:TEXT?") logging.info(f"Display message is {display_message}") return display_message @display_message.setter def display_message(self, display_message: str): """ Page 59 Displays a message on the Meter's display. The Meter must be remote before executing this command. Display string is up to 12 characters. Additional characters are truncated. Quotation mark is needed when sending displayed string :param display_message: message as string to display up to 12 characters :raises Fluke8845aCheckError: if setting failed """ if len(display_message) > self.DISPLAY_MAX_LENGTH: display_message = display_message[: self.DISPLAY_MAX_LENGTH] logger.warning( f"Desired message has {len(display_message)} characters; " f"only the first {self.DISPLAY_MAX_LENGTH} is displayed; " "additional characters are truncated." f"The displayed text is '{display_message}'" ) # fluke display example: DISP:TEXT "hello", quotation mark is needed self.com.write(f'DISP:TEXT "{display_message}"') if self.com.query("DISP:TEXT?") == f'"{display_message}"': logger.info(f'The text "{display_message}" is displayed successfully') else: msg = "The text displayed failed" logger.error(msg) raise Fluke8845aCheckError(msg) dc_voltage_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.VOLTAGE_DC) ) ac_voltage_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.VOLTAGE_AC) ) dc_current_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.CURRENT_DC) ) ac_current_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.CURRENT_AC) ) two_wire_resistance_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.TWO_WIRE_RESISTANCE) ) four_wire_resistance_range = _RangeDescriptor( cast(MeasurementFunction, MeasurementFunction.FOUR_WIRE_RESISTANCE) ) voltage_filter = _FilterDescriptor( cast(MeasurementFunction, MeasurementFunction.VOLTAGE_AC) ) current_filter = _FilterDescriptor( cast(MeasurementFunction, MeasurementFunction.CURRENT_AC) ) frequency_aperture = _ApertureDescriptor( cast(MeasurementFunction, MeasurementFunction.FREQUENCY) ) period_aperture = _ApertureDescriptor( cast(MeasurementFunction, MeasurementFunction.PERIOD) )