hvl_ccb.comm package

Submodules

hvl_ccb.comm.base module

Module with base classes for communication protocols.

class hvl_ccb.comm.base.AsyncCommunicationProtocol(config)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Abstract base class for asynchronous communication protocols

static config_cls()Type[hvl_ccb.comm.base.AsyncCommunicationProtocolConfig][source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

read()str[source]

Read a single line of text as str from the communication.

Returns

text as str including the terminator, which can also be empty “”

read_all(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None)Optional[str][source]

Read all lines of text from the connection till nothing is left to read.

Parameters
  • n_attempts_max – Amount of attempts how often a non-empty text is tried to be read

  • attempt_interval_sec – time between the reading attempts

Returns

A multi-line str including the terminator internally

abstract read_bytes()bytes[source]

Read a single line as bytes from the communication.

This method uses self.access_lock to ensure thread-safety.

Returns

a single line as bytes containing the terminator, which can also be empty b””

read_nonempty(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None)Optional[str][source]

Try to read a non-empty single line of text as str from the communication. If the host does not reply or reply with white space only, it will return None.

Returns

a non-empty text as a str or None in case of an empty string

Parameters
  • n_attempts_max – Amount of attempts how often a non-empty text is tried to be read

  • attempt_interval_sec – time between the reading attempts

read_text()str[source]

Read one line of text from the serial port. The input buffer may hold additional data afterwards, since only one line is read.

NOTE: backward-compatibility proxy for read method; to be removed in v1.0

Returns

String read from the serial port; ‘’ if there was nothing to read.

Raises

SerialCommunicationIOError – when communication port is not opened

read_text_nonempty(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None)Optional[str][source]

Reads from the serial port, until a non-empty line is found, or the number of attempts is exceeded.

NOTE: backward-compatibility proxy for read method; to be removed in v1.0

Attention: in contrast to read_text, the returned answer will be stripped of a whitespace newline terminator at the end, if such terminator is set in the initial configuration (default).

Parameters
  • n_attempts_max – maximum number of read attempts

  • attempt_interval_sec – time between the reading attempts

Returns

String read from the serial port; ‘’ if number of attempts is exceeded or serial port is not opened.

write(text: str)[source]

Write text as str to the communication.

Parameters

text – test as a str to be written

abstract write_bytes(data: bytes)int[source]

Write data as bytes to the communication.

This method uses self.access_lock to ensure thread-safety.

Parameters

data – data as bytes-string to be written

Returns

number of bytes written

write_text(text: str)[source]

Write text to the serial port. The text is encoded and terminated by the configured terminator.

NOTE: backward-compatibility proxy for read method; to be removed in v1.0

Parameters

text – Text to send to the port.

Raises

SerialCommunicationIOError – when communication port is not opened

class hvl_ccb.comm.base.AsyncCommunicationProtocolConfig(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10)[source]

Bases: object

Base configuration data class for asynchronous communication protocols

clean_values()[source]
default_n_attempts_read_text_nonempty: int = 10

default number of attempts to read a non-empty text

encoding: str = 'utf-8'

Standard encoding of the connection. Typically this is utf-8, but can also be latin-1 or something from here: https://docs.python.org/3/library/codecs.html#standard-encodings

encoding_error_handling: str = 'replace'

Encoding error handling scheme as defined here: https://docs.python.org/3/library/codecs.html#error-handlers By default replacing invalid characters with “uFFFD” REPLACEMENT CHARACTER on decoding and with “?” on decoding.

force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

is_configdataclass = True
classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

terminator: bytes = b'\r\n'

The terminator character. Typically this is b'\r\n' or b'\n', but can also be b'\r' or other combinations. This defines the end of a single line.

wait_sec_read_text_nonempty: Union[int, float] = 0.5

time to wait between attempts of reading a non-empty text

class hvl_ccb.comm.base.CommunicationProtocol(config)[source]

Bases: hvl_ccb.configuration.ConfigurationMixin, abc.ABC

Communication protocol abstract base class.

Specifies the methods to implement for communication protocol, as well as implements some default settings and checks.

access_lock

Access lock to use with context manager when accessing the communication protocol (thread safety)

abstract close()[source]

Close the communication protocol

abstract open()[source]

Open communication protocol

class hvl_ccb.comm.base.NullCommunicationProtocol(config)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Communication protocol that does nothing.

close()None[source]

Void close function.

static config_cls()Type[hvl_ccb.configuration.EmptyConfig][source]

Empty configuration

Returns

EmptyConfig

open()None[source]

Void open function.

class hvl_ccb.comm.base.SyncCommunicationProtocol(config)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocol, abc.ABC

Abstract base class for synchronous communication protocols with query()

static config_cls()Type[hvl_ccb.comm.base.SyncCommunicationProtocolConfig][source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

query(command: str)Optional[str][source]

Send a command to the interface and handle the status message. Eventually raises an exception.

Parameters

command – Command to send

Returns

Answer from the interface, which can be None instead of an empty reply

class hvl_ccb.comm.base.SyncCommunicationProtocolConfig(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocolConfig

hvl_ccb.comm.labjack_ljm module

Communication protocol for LabJack using the LJM Library. Originally developed and tested for LabJack T7-PRO.

Makes use of the LabJack LJM Library Python wrapper. This wrapper needs an installation of the LJM Library for Windows, Mac OS X or Linux. Go to: https://labjack.com/support/software/installers/ljm and https://labjack.com/support/software/examples/ljm/python

class hvl_ccb.comm.labjack_ljm.LJMCommunication(configuration)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Communication protocol implementing the LabJack LJM Library Python wrapper.

close()None[source]

Close the communication port.

static config_cls()[source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

property is_open

Flag indicating if the communication port is open.

Returns

True if the port is open, otherwise False

open()None[source]

Open the communication port.

read_name(*names: str, return_num_type: Type[numbers.Real] = <class 'float'>)Union[numbers.Real, Sequence[numbers.Real]][source]

Read one or more input numeric values by name.

Parameters
  • names – one or more names to read out from the LabJack

  • return_num_type – optional numeric type specification for return values; by default float.

Returns

answer of the LabJack, either single number or multiple numbers in a sequence, respectively, when one or multiple names to read were given

Raises

TypeError – if read value of type not compatible with return_num_type

write_name(name: str, value: numbers.Real)None[source]

Write one value to a named output.

Parameters
  • name – String or with name of LabJack IO

  • value – is the value to write to the named IO port

write_names(name_value_dict: Dict[str, numbers.Real])None[source]

Write more than one value at once to named outputs.

Parameters

name_value_dict – is a dictionary with string names of LabJack IO as keys and corresponding numeric values

class hvl_ccb.comm.labjack_ljm.LJMCommunicationConfig(device_type: Union[str, hvl_ccb._dev.labjack.DeviceType] = 'ANY', connection_type: Union[str, hvl_ccb.comm.labjack_ljm.LJMCommunicationConfig.ConnectionType] = 'ANY', identifier: str = 'ANY')[source]

Bases: object

Configuration dataclass for LJMCommunication.

class ConnectionType(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]

Bases: hvl_ccb.utils.enum.AutoNumberNameEnum

LabJack connection type.

ANY = 1
ETHERNET = 4
TCP = 3
USB = 2
WIFI = 5
class DeviceType(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)

Bases: hvl_ccb.utils.enum.AutoNumberNameEnum

LabJack device types.

Can be also looked up by ambigious Product ID (p_id) or by instance name: `python LabJackDeviceType(4) is LabJackDeviceType('T4') `

ANY = 1
T4 = 2
T7 = 3
T7_PRO = 4
classmethod get_by_p_id(p_id: int)Union[hvl_ccb._dev.labjack.DeviceType, List[hvl_ccb._dev.labjack.DeviceType]]

Get LabJack device type instance via LabJack product ID.

Note: Product ID is not unambiguous for LabJack devices.

Parameters

p_id – Product ID of a LabJack device

Returns

Instance or list of instances of LabJackDeviceType

Raises

ValueError – when Product ID is unknown

clean_values()None[source]

Performs value checks on device_type and connection_type.

connection_type: Union[str, hvl_ccb.comm.labjack_ljm.LJMCommunicationConfig.ConnectionType] = 'ANY'

Can be either string or of enum ConnectionType.

device_type: Union[str, hvl_ccb._dev.labjack.DeviceType] = 'ANY'

Can be either string ‘ANY’, ‘T7_PRO’, ‘T7’, ‘T4’, or of enum DeviceType.

force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

identifier: str = 'ANY'

The identifier specifies information for the connection to be used. This can be an IP address, serial number, or device name. See the LabJack docs ( https://labjack.com/support/software/api/ljm/function-reference/ljmopens/identifier-parameter) for more information.

is_configdataclass = True
classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

exception hvl_ccb.comm.labjack_ljm.LJMCommunicationError[source]

Bases: Exception

Errors coming from LJMCommunication.

hvl_ccb.comm.modbus_tcp module

Communication protocol for modbus TCP ports. Makes use of the pymodbus library.

class hvl_ccb.comm.modbus_tcp.ModbusTcpCommunication(configuration)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Implements the Communication Protocol for modbus TCP.

close()[source]

Close the Modbus TCP connection.

static config_cls()[source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

open()None[source]

Open the Modbus TCP connection.

Raises

ModbusTcpConnectionFailedException – if the connection fails.

read_holding_registers(address: int, count: int)List[int][source]

Read specified number of register starting with given address and return the values from each register.

Parameters
  • address – address of the first register

  • count – count of registers to read

Returns

list of int values

read_input_registers(address: int, count: int)List[int][source]

Read specified number of register starting with given address and return the values from each register in a list.

Parameters
  • address – address of the first register

  • count – count of registers to read

Returns

list of int values

write_registers(address: int, values: Union[List[int], int])[source]

Write values from the specified address forward.

Parameters
  • address – address of the first register

  • values – list with all values

class hvl_ccb.comm.modbus_tcp.ModbusTcpCommunicationConfig(host: str, unit: int, port: int = 502)[source]

Bases: object

Configuration dataclass for ModbusTcpCommunication.

clean_values()[source]
force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

host: str

Host is the IP address of the connected device.

is_configdataclass = True
classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

port: int = 502

TCP port

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

unit: int

Unit number to be used when connecting with Modbus/TCP. Typically this is used when connecting to a relay having Modbus/RTU-connected devices.

exception hvl_ccb.comm.modbus_tcp.ModbusTcpConnectionFailedException(string='')[source]

Bases: pymodbus.exceptions.ConnectionException

Exception raised when the connection failed.

hvl_ccb.comm.opc module

Communication protocol implementing an OPC UA connection. This protocol is used to interface with the “Supercube” PLC from Siemens.

class hvl_ccb.comm.opc.OpcUaCommunication(config)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Communication protocol implementing an OPC UA connection. Makes use of the package python-opcua.

close()None[source]

Close the connection to the OPC UA server.

static config_cls()[source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

init_monitored_nodes(node_id: Union[object, Iterable], ns_index: int)None[source]

Initialize monitored nodes.

Parameters
  • node_id – one or more strings of node IDs; node IDs are always casted via str() method here, hence do not have to be strictly string objects.

  • ns_index – the namespace index the nodes belong to.

Raises

OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server

property is_open

Flag indicating if the communication port is open.

Returns

True if the port is open, otherwise False

open()None[source]

Open the communication to the OPC UA server.

Raises

OpcUaCommunicationIOError – when communication port cannot be opened.

read(node_id, ns_index)[source]

Read a value from a node with id and namespace index.

Parameters
  • node_id – the ID of the node to read the value from

  • ns_index – the namespace index of the node

Returns

the value of the node object.

Raises

OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server

write(node_id, ns_index, value)None[source]

Write a value to a node with name name.

Parameters
  • node_id – the id of the node to write the value to.

  • ns_index – the namespace index of the node.

  • value – the value to write.

Raises

OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server

class hvl_ccb.comm.opc.OpcUaCommunicationConfig(host: str, endpoint_name: str, port: int = 4840, sub_handler: hvl_ccb.comm.opc.OpcUaSubHandler = <hvl_ccb.comm.opc.OpcUaSubHandler object>, update_period: int = 500, wait_timeout_retry_sec: Union[int, float] = 1, max_timeout_retry_nr: int = 5)[source]

Bases: object

Configuration dataclass for OPC UA Communciation.

clean_values()[source]
endpoint_name: str

Endpoint of the OPC server, this is a path like ‘OPCUA/SimulationServer’

force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

host: str

Hostname or IP-Address of the OPC UA server.

is_configdataclass = True
classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

max_timeout_retry_nr: int = 5

Maximal number of call re-tries on underlying OPC UA client timeout error

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

port: int = 4840

Port of the OPC UA server to connect to.

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

sub_handler: hvl_ccb.comm.opc.OpcUaSubHandler = <hvl_ccb.comm.opc.OpcUaSubHandler object>

object to use for handling subscriptions.

update_period: int = 500

Update period for generating datachange events in OPC UA [milli seconds]

wait_timeout_retry_sec: Union[int, float] = 1

Wait time between re-trying calls on underlying OPC UA client timeout error

exception hvl_ccb.comm.opc.OpcUaCommunicationIOError[source]

Bases: OSError

OPC-UA communication I/O error.

exception hvl_ccb.comm.opc.OpcUaCommunicationTimeoutError[source]

Bases: hvl_ccb.comm.opc.OpcUaCommunicationIOError

OPC-UA communication timeout error.

class hvl_ccb.comm.opc.OpcUaSubHandler[source]

Bases: object

Base class for subscription handling of OPC events and data change events. Override methods from this class to add own handling capabilities.

To receive events from server for a subscription data_change and event methods are called directly from receiving thread. Do not do expensive, slow or network operation there. Create another thread if you need to do such a thing.

datachange_notification(node, val, data)[source]
event_notification(event)[source]

hvl_ccb.comm.serial module

Communication protocol for serial ports. Makes use of the pySerial library.

class hvl_ccb.comm.serial.SerialCommunication(configuration)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocol

Implements the Communication Protocol for serial ports.

close()[source]

Close the serial connection.

static config_cls()[source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

property is_open

Flag indicating if the serial port is open.

Returns

True if the serial port is open, otherwise False

open()[source]

Open the serial connection.

Raises

SerialCommunicationIOError – when communication port cannot be opened.

read_bytes()bytes[source]

Read the bytes from the serial port till the terminator is found. The input buffer may hold additional lines afterwards.

This method uses self.access_lock to ensure thread-safety.

Returns

Bytes read from the serial port; b’’ if there was nothing to read.

Raises

SerialCommunicationIOError – when communication port is not opened

read_single_bytes(size: int = 1)bytes[source]

Read the specified number of bytes from the serial port. The input buffer may hold additional data afterwards.

Returns

Bytes read from the serial port; b’’ if there was nothing to read.

write_bytes(data: bytes)int[source]

Write bytes to the serial port.

This method uses self.access_lock to ensure thread-safety.

Parameters

data – data to write to the serial port

Returns

number of bytes written

Raises

SerialCommunicationIOError – when communication port is not opened

class hvl_ccb.comm.serial.SerialCommunicationBytesize(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]

Bases: hvl_ccb.utils.enum.ValueEnum

Serial communication bytesize.

EIGHTBITS = 8
FIVEBITS = 5
SEVENBITS = 7
SIXBITS = 6
class hvl_ccb.comm.serial.SerialCommunicationConfig(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10, port: Optional[str] = None, baudrate: int = 9600, parity: Union[str, hvl_ccb.comm.serial.SerialCommunicationParity] = <SerialCommunicationParity.NONE: 'N'>, stopbits: Union[int, float, hvl_ccb.comm.serial.SerialCommunicationStopbits] = <SerialCommunicationStopbits.ONE: 1>, bytesize: Union[int, hvl_ccb.comm.serial.SerialCommunicationBytesize] = <SerialCommunicationBytesize.EIGHTBITS: 8>, timeout: Union[int, float] = 2)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocolConfig

Configuration dataclass for SerialCommunication.

Bytesize

alias of hvl_ccb.comm.serial.SerialCommunicationBytesize

Parity

alias of hvl_ccb.comm.serial.SerialCommunicationParity

Stopbits

alias of hvl_ccb.comm.serial.SerialCommunicationStopbits

baudrate: int = 9600

Baudrate of the serial port

bytesize: Union[int, hvl_ccb.comm.serial.SerialCommunicationBytesize] = 8

Size of a byte, 5 to 8

clean_values()[source]
create_serial_port()serial.serialposix.Serial[source]

Create a serial port instance according to specification in this configuration

Returns

Closed serial port instance

force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

parity: Union[str, hvl_ccb.comm.serial.SerialCommunicationParity] = 'N'

Parity to be used for the connection.

port: Optional[str] = None

Port is a string referring to a COM-port (e.g. 'COM3') or a URL. The full list of capabilities is found on the pyserial documentation.

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

stopbits: Union[int, float, hvl_ccb.comm.serial.SerialCommunicationStopbits] = 1

Stopbits setting, can be 1, 1.5 or 2.

terminator_str()str[source]
timeout: Union[int, float] = 2

Timeout in seconds for the serial port

exception hvl_ccb.comm.serial.SerialCommunicationIOError[source]

Bases: OSError

Serial communication related I/O errors.

class hvl_ccb.comm.serial.SerialCommunicationParity(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]

Bases: hvl_ccb.utils.enum.ValueEnum

Serial communication parity.

EVEN = 'E'
MARK = 'M'
NAMES = {'E': 'Even', 'M': 'Mark', 'N': 'None', 'O': 'Odd', 'S': 'Space'}
NONE = 'N'
ODD = 'O'
SPACE = 'S'
class hvl_ccb.comm.serial.SerialCommunicationStopbits(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]

Bases: hvl_ccb.utils.enum.ValueEnum

Serial communication stopbits.

ONE = 1
ONE_POINT_FIVE = 1.5
TWO = 2

hvl_ccb.comm.telnet module

Communication protocol for telnet. Makes use of the telnetlib library.

class hvl_ccb.comm.telnet.TelnetCommunication(configuration)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocol

Implements the Communication Protocol for telnet.

close()[source]

Close the telnet connection unless it is not closed.

static config_cls()[source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

property is_open

Is the connection open?

Returns

True for an open connection

open()[source]

Open the telnet connection unless it is not yet opened.

read_bytes()bytes[source]

Read data as bytes from the telnet connection.

Returns

data from telnet connection

Raises

TelnetError – when connection is not open, raises an Error during the communication

write_bytes(data: bytes)[source]

Write the data as bytes to the telnet connection.

Parameters

data – Data to be sent.

Raises

TelnetError – when connection is not open, raises an Error during the communication

class hvl_ccb.comm.telnet.TelnetCommunicationConfig(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10, host: Optional[str] = None, port: int = 0, timeout: Union[int, float] = 0.2)[source]

Bases: hvl_ccb.comm.base.AsyncCommunicationProtocolConfig

Configuration dataclass for TelnetCommunication.

clean_values()[source]
create_telnet()Optional[telnetlib.Telnet][source]

Create a telnet client :return: Opened Telnet object or None if connection is not possible

force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

host: Optional[str] = None

Host to connect to can be localhost or

classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

port: int = 0

Port at which the host is listening

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

timeout: Union[int, float] = 0.2

Timeout for reading a line

exception hvl_ccb.comm.telnet.TelnetError[source]

Bases: Exception

Telnet communication related errors.

hvl_ccb.comm.visa module

Communication protocol for VISA. Makes use of the pyvisa library. The backend can be NI-Visa or pyvisa-py.

Information on how to install a VISA backend can be found here: https://pyvisa.readthedocs.io/en/master/getting_nivisa.html

So far only TCPIP SOCKET and TCPIP INSTR interfaces are supported.

class hvl_ccb.comm.visa.VisaCommunication(configuration)[source]

Bases: hvl_ccb.comm.base.CommunicationProtocol

Implements the Communication Protocol for VISA / SCPI.

MULTI_COMMANDS_MAX = 5

The maximum of commands that can be sent in one round is 5 according to the VISA standard.

MULTI_COMMANDS_SEPARATOR = ';'

The character to separate two commands is ; according to the VISA standard.

WAIT_AFTER_WRITE = 0.08

Small pause in seconds to wait after write operations, allowing devices to really do what we tell them before continuing with further tasks.

close()None[source]

Close the VISA connection and invalidates the handle.

static config_cls()Type[hvl_ccb.comm.visa.VisaCommunicationConfig][source]

Return the default configdataclass class.

Returns

a reference to the default configdataclass class

open()None[source]

Open the VISA connection and create the resource.

query(*commands: str)Union[str, Tuple[str, ]][source]

A combination of write(message) and read.

Parameters

commands – list of commands

Returns

list of values

Raises

VisaCommunicationError – when connection was not started, or when trying to issue too many commands at once.

spoll()int[source]

Execute serial poll on the device. Reads the status byte register STB. This is a fast function that can be executed periodically in a polling fashion.

Returns

integer representation of the status byte

Raises

VisaCommunicationError – when connection was not started

write(*commands: str)None[source]

Write commands. No answer is read or expected.

Parameters

commands – one or more commands to send

Raises

VisaCommunicationError – when connection was not started

class hvl_ccb.comm.visa.VisaCommunicationConfig(host: str, interface_type: Union[str, hvl_ccb.comm.visa.VisaCommunicationConfig.InterfaceType], board: int = 0, port: int = 5025, timeout: int = 5000, chunk_size: int = 204800, open_timeout: int = 1000, write_termination: str = '\n', read_termination: str = '\n', visa_backend: str = '')[source]

Bases: object

VisaCommunication configuration dataclass.

class InterfaceType(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]

Bases: hvl_ccb.utils.enum.AutoNumberNameEnum

Supported VISA Interface types.

TCPIP_INSTR = 2

VXI-11 protocol

TCPIP_SOCKET = 1

VISA-RAW protocol

address(host: str, port: Optional[int] = None, board: Optional[int] = None)str[source]

Address string specific to the VISA interface type.

Parameters
  • host – host IP address

  • port – optional TCP port

  • board – optional board number

Returns

address string

property address

Address string depending on the VISA protocol’s configuration.

Returns

address string corresponding to current configuration

board: int = 0

Board number is typically 0 and comes from old bus systems.

chunk_size: int = 204800

Chunk size is the allocated memory for read operations. The standard is 20kB, and is increased per default here to 200kB. It is specified in bytes.

clean_values()[source]
force_value(fieldname, value)

Forces a value to a dataclass field despite the class being frozen.

NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.

Parameters
  • fieldname – name of the field

  • value – value to assign

host: str

IP address of the VISA device. DNS names are currently unsupported.

interface_type: Union[str, hvl_ccb.comm.visa.VisaCommunicationConfig.InterfaceType]

Interface type of the VISA connection, being one of InterfaceType.

is_configdataclass = True
classmethod keys()Sequence[str]

Returns a list of all configdataclass fields key-names.

Returns

a list of strings containing all keys.

open_timeout: int = 1000

Timeout for opening the connection, in milli seconds.

classmethod optional_defaults()Dict[str, object]

Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.

Returns

a list of strings containing all optional keys.

port: int = 5025

TCP port, standard is 5025.

read_termination: str = '\n'

Read termination character.

classmethod required_keys()Sequence[str]

Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.

Returns

a list of strings containing all required keys.

timeout: int = 5000

Timeout for commands in milli seconds.

visa_backend: str = ''

Specifies the path to the library to be used with PyVISA as a backend. Defaults to None, which is NI-VISA (if installed), or pyvisa-py (if NI-VISA is not found). To force the use of pyvisa-py, specify @py’ here.

write_termination: str = '\n'

Write termination character.

exception hvl_ccb.comm.visa.VisaCommunicationError[source]

Bases: Exception

Base class for VisaCommunication errors.

Module contents

Communication protocols subpackage.