Source code for hvl_ccb.comm.visa

#  Copyright (c) ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for VISA. Makes use of the pyvisa library.
The backend can be NI-Visa or pyvisa-py.

Information on how to install a VISA backend can be found here:
https://pyvisa.readthedocs.io/en/master/getting_nivisa.html

So far only TCPIP SOCKET and TCPIP INSTR interfaces are supported.
"""

import logging
from ipaddress import IPv4Address, IPv6Address
from time import sleep
from typing import Optional, Union, cast

import pyvisa as visa
from pyvisa.resources import MessageBasedResource
from pyvisa_py.protocols.rpc import RPCError  # type: ignore

from hvl_ccb.comm import CommunicationError, CommunicationProtocol
from hvl_ccb.configuration import configdataclass
from hvl_ccb.utils.enum import AutoNumberNameEnum
from hvl_ccb.utils.validation import validate_and_resolve_host, validate_tcp_port

logger = logging.getLogger(__name__)


[docs] class VisaCommunicationError(IOError, CommunicationError): """ Base class for VisaCommunication errors. """ pass
[docs] @configdataclass class VisaCommunicationConfig: """ `VisaCommunication` configuration dataclass. """
[docs] class InterfaceType(AutoNumberNameEnum, init="value"): # type:ignore """ Supported VISA Interface types. """ #: VISA-RAW protocol TCPIP_SOCKET = ((),) #: VXI-11 protocol TCPIP_INSTR = ((),)
[docs] def address( self, host: str, port: Optional[int] = None, board: Optional[int] = None ) -> str: """ Address string specific to the VISA interface type. :param host: host IP address :param port: optional TCP port :param board: optional board number :return: address string """ if self.name == VisaCommunicationConfig.InterfaceType.TCPIP_SOCKET: return f"TCPIP{board}::{host}::{port}::SOCKET" elif self.name == VisaCommunicationConfig.InterfaceType.TCPIP_INSTR: return f"TCPIP::{host}::INSTR" else: assert False, self.name + " is unknown InterfaceType."
# IP address of the VISA device. host: Union[str, IPv4Address, IPv6Address] #: Interface type of the VISA connection, being one of :class:`InterfaceType`. interface_type: Union[str, InterfaceType] #: Board number is typically 0 and comes from old bus systems. board: int = 0 #: TCP port, standard is 5025. port: int = 5025 #: Timeout for commands in milli seconds. timeout: int = 5000 #: Chunk size is the allocated memory for read operations. The standard is 20kB, #: and is increased per default here to 200kB. It is specified in bytes. chunk_size: int = 204800 #: Timeout for opening the connection, in milli seconds. open_timeout: int = 1000 #: Write termination character. write_termination: str = "\n" #: Read termination character. read_termination: str = "\n" visa_backend: str = "" """ Specifies the path to the library to be used with PyVISA as a backend. Defaults to None, which is NI-VISA (if installed), or pyvisa-py (if NI-VISA is not found). To force the use of pyvisa-py, specify '@py' here. """
[docs] def clean_values(self): # in principle, host is allowed to be IP or FQDN. However, we only allow IP: self.force_value("host", validate_and_resolve_host(self.host, logger)) validate_tcp_port(self.port, logger) if not isinstance(self.interface_type, self.InterfaceType): self.force_value("interface_type", self.InterfaceType(self.interface_type)) if self.board < 0: raise ValueError("Board number has to be >= 0.") if self.timeout < 0: raise ValueError("Timeout has to be >= 0.") if self.open_timeout < 0: raise ValueError("Open Timeout has to be >= 0.") allowed_terminators = ("\n", "\r", "\r\n") if self.read_termination not in allowed_terminators: raise ValueError("Read terminator has to be \\n, \\r or \\r\\n.") if self.write_termination not in allowed_terminators: raise ValueError("Write terminator has to be \\n, \\r or \\r\\n.")
@property def address(self) -> str: """ Address string depending on the VISA protocol's configuration. :return: address string corresponding to current configuration """ return self.interface_type.address( # type: ignore cast(str, self.host), port=self.port, board=self.board, )
[docs] class VisaCommunication(CommunicationProtocol): """ Implements the Communication Protocol for VISA / SCPI. """ #: The maximum of commands that can be sent in one round is 5 according to the #: VISA standard. MULTI_COMMANDS_MAX = 5 #: The character to separate two commands is ; according to the VISA standard. MULTI_COMMANDS_SEPARATOR = ";" #: Small pause in seconds to wait after write operations, allowing devices to #: really do what we tell them before continuing with further tasks. WAIT_AFTER_WRITE = 0.08 # seconds to wait after a write is sent def __init__(self, configuration) -> None: """ Constructor for VisaCommunication. """ super().__init__(configuration) # create a new resource manager if self.config.visa_backend == "": self._resource_manager = visa.ResourceManager() else: self._resource_manager = visa.ResourceManager(self.config.visa_backend) self._instrument: Optional[MessageBasedResource] = None
[docs] @staticmethod def config_cls() -> type[VisaCommunicationConfig]: return VisaCommunicationConfig
[docs] def open(self) -> None: """ Open the VISA connection and create the resource. """ logger.info("Open the VISA connection.") with self.access_lock: try: self._instrument = cast( MessageBasedResource, self._resource_manager.open_resource( self.config.address, open_timeout=self.config.open_timeout, ), ) self._instrument.chunk_size = self.config.chunk_size self._instrument.timeout = self.config.timeout self._instrument.write_termination = self.config.write_termination self._instrument.read_termination = self.config.read_termination except visa.VisaIOError as e: logger.error(str(e), exc_info=e) if e.error_code != 0: raise VisaCommunicationError from e except ( RPCError, ConnectionRefusedError, BrokenPipeError, ) as e: # if pyvisa-py is used as backend, this RPCError can come. As it is # difficult to import (hyphen in package name), we "convert" it here to # a VisaCommunicationError. Apparently on the Linux runners, # a ConnectionRefusedError is raised on fail, rather than an RPCError. # On macOS the BrokenPipeError error is raised (from # pyvisa-py/protocols/rpc.py:320), with puzzling log message from # visa.py: "187 WARNING Could not close VISA connection, was not # started." logger.error(str(e), exc_info=e) raise VisaCommunicationError from e if self._instrument is not None: try: # enable keep-alive of the connection. Seems not to work always, but # using the status poller a keepalive of the connection is also # satisfied. Unsupported on RTO 1022 devices. self._instrument.set_visa_attribute( visa.constants.ResourceAttribute.tcpip_keepalive, visa.constants.VI_TRUE, ) except visa.VisaIOError as e: if e.abbreviation == "VI_ERROR_NSUP_ATTR": logger.warning(str(e), exc_info=e) else: logger.error(str(e), exc_info=e) if e.error_code != 0: raise VisaCommunicationError from e
[docs] def close(self) -> None: """ Close the VISA connection and invalidates the handle. """ if self._instrument is None: logger.warning("Could not close VISA connection, was not started.") return try: with self.access_lock: self._instrument.close() except visa.InvalidSession as e: logger.warning( "Could not close VISA connection, session invalid.", exc_info=e )
[docs] def write(self, *commands: str) -> None: """ Write commands. No answer is read or expected. :param commands: one or more commands to send :raises VisaCommunicationError: when connection was not started """ if self._instrument is None: msg = "Could not write to VISA connection, was not started." logger.error(msg) raise VisaCommunicationError(msg) with self.access_lock: self._instrument.write(self._generate_cmd_string(commands)) # sleep small amount of time to not overload device sleep(self.WAIT_AFTER_WRITE)
[docs] def query(self, *commands: str) -> Union[str, tuple[str, ...]]: """ A combination of write(message) and read. :param commands: list of commands :return: list of values :raises VisaCommunicationError: when connection was not started, or when trying to issue too many commands at once. """ if self._instrument is None: msg = "Could not query VISA connection, was not started." logger.error(msg) raise VisaCommunicationError(msg) cmd_string = self._generate_cmd_string(commands) with self.access_lock: return_string = self._instrument.query(cmd_string) if len(commands) == 1: return return_string return tuple(return_string.split(self.MULTI_COMMANDS_SEPARATOR))
@classmethod def _generate_cmd_string(cls, command_list: tuple[str, ...]) -> str: """ Generate the command string out of a tuple of strings. :param command_list: is the tuple containing multiple commands :return: the command string that can be sent via the protocol """ if len(command_list) <= cls.MULTI_COMMANDS_MAX: return cls.MULTI_COMMANDS_SEPARATOR.join(command_list) raise VisaCommunicationError( f"Too many commands at once ({len(command_list)}). Max allowed: " f"{cls.MULTI_COMMANDS_MAX}." )
[docs] def spoll(self) -> int: """ Execute serial poll on the device. Reads the status byte register STB. This is a fast function that can be executed periodically in a polling fashion. :return: integer representation of the status byte :raises VisaCommunicationError: when connection was not started """ if self._instrument is None: msg = "Could not query VISA connection, was not started." logger.error(msg) raise VisaCommunicationError(msg) interface_type = self.config.interface_type if interface_type == VisaCommunicationConfig.InterfaceType.TCPIP_INSTR: with self.access_lock: stb = self._instrument.read_stb() return stb if interface_type == VisaCommunicationConfig.InterfaceType.TCPIP_SOCKET: return int(self.query("*STB?")) # type: ignore assert False, "Forgot to cover interface_type case?"