hvl_ccb.comm package¶
Submodules¶
hvl_ccb.comm.base module¶
Module with base classes for communication protocols.
-
class
hvl_ccb.comm.base.
AsyncCommunicationProtocol
(config)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Abstract base class for asynchronous communication protocols
-
static
config_cls
() → Type[hvl_ccb.comm.base.AsyncCommunicationProtocolConfig][source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
read
() → str[source]¶ Read a single line of text as str from the communication.
- Returns
text as str including the terminator, which can also be empty “”
-
read_all
(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None) → Optional[str][source]¶ Read all lines of text from the connection till nothing is left to read.
- Parameters
n_attempts_max – Amount of attempts how often a non-empty text is tried to be read
attempt_interval_sec – time between the reading attempts
- Returns
A multi-line str including the terminator internally
-
abstract
read_bytes
() → bytes[source]¶ Read a single line as bytes from the communication.
This method uses self.access_lock to ensure thread-safety.
- Returns
a single line as bytes containing the terminator, which can also be empty b””
-
read_nonempty
(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None) → Optional[str][source]¶ Try to read a non-empty single line of text as str from the communication. If the host does not reply or reply with white space only, it will return None.
- Returns
a non-empty text as a str or None in case of an empty string
- Parameters
n_attempts_max – Amount of attempts how often a non-empty text is tried to be read
attempt_interval_sec – time between the reading attempts
-
read_text
() → str[source]¶ Read one line of text from the serial port. The input buffer may hold additional data afterwards, since only one line is read.
NOTE: backward-compatibility proxy for read method; to be removed in v1.0
- Returns
String read from the serial port; ‘’ if there was nothing to read.
- Raises
SerialCommunicationIOError – when communication port is not opened
-
read_text_nonempty
(n_attempts_max: Optional[int] = None, attempt_interval_sec: Optional[Union[int, float]] = None) → Optional[str][source]¶ Reads from the serial port, until a non-empty line is found, or the number of attempts is exceeded.
NOTE: backward-compatibility proxy for read method; to be removed in v1.0
Attention: in contrast to read_text, the returned answer will be stripped of a whitespace newline terminator at the end, if such terminator is set in the initial configuration (default).
- Parameters
n_attempts_max – maximum number of read attempts
attempt_interval_sec – time between the reading attempts
- Returns
String read from the serial port; ‘’ if number of attempts is exceeded or serial port is not opened.
-
write
(text: str)[source]¶ Write text as str to the communication.
- Parameters
text – test as a str to be written
-
abstract
write_bytes
(data: bytes) → int[source]¶ Write data as bytes to the communication.
This method uses self.access_lock to ensure thread-safety.
- Parameters
data – data as bytes-string to be written
- Returns
number of bytes written
-
write_text
(text: str)[source]¶ Write text to the serial port. The text is encoded and terminated by the configured terminator.
NOTE: backward-compatibility proxy for read method; to be removed in v1.0
- Parameters
text – Text to send to the port.
- Raises
SerialCommunicationIOError – when communication port is not opened
-
static
-
class
hvl_ccb.comm.base.
AsyncCommunicationProtocolConfig
(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10)[source]¶ Bases:
object
Base configuration data class for asynchronous communication protocols
-
default_n_attempts_read_text_nonempty
: int = 10¶ default number of attempts to read a non-empty text
-
encoding
: str = 'utf-8'¶ Standard encoding of the connection. Typically this is
utf-8
, but can also belatin-1
or something from here: https://docs.python.org/3/library/codecs.html#standard-encodings
-
encoding_error_handling
: str = 'replace'¶ Encoding error handling scheme as defined here: https://docs.python.org/3/library/codecs.html#error-handlers By default replacing invalid characters with “uFFFD” REPLACEMENT CHARACTER on decoding and with “?” on decoding.
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
is_configdataclass
= True¶
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
terminator
: bytes = b'\r\n'¶ The terminator character. Typically this is
b'\r\n'
orb'\n'
, but can also beb'\r'
or other combinations. This defines the end of a single line.
-
wait_sec_read_text_nonempty
: Union[int, float] = 0.5¶ time to wait between attempts of reading a non-empty text
-
-
class
hvl_ccb.comm.base.
CommunicationProtocol
(config)[source]¶ Bases:
hvl_ccb.configuration.ConfigurationMixin
,abc.ABC
Communication protocol abstract base class.
Specifies the methods to implement for communication protocol, as well as implements some default settings and checks.
-
access_lock
¶ Access lock to use with context manager when accessing the communication protocol (thread safety)
-
-
class
hvl_ccb.comm.base.
NullCommunicationProtocol
(config)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Communication protocol that does nothing.
-
static
config_cls
() → Type[hvl_ccb.configuration.EmptyConfig][source]¶ Empty configuration
- Returns
EmptyConfig
-
static
-
class
hvl_ccb.comm.base.
SyncCommunicationProtocol
(config)[source]¶ Bases:
hvl_ccb.comm.base.AsyncCommunicationProtocol
,abc.ABC
Abstract base class for synchronous communication protocols with query()
-
static
config_cls
() → Type[hvl_ccb.comm.base.SyncCommunicationProtocolConfig][source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
static
hvl_ccb.comm.labjack_ljm module¶
Communication protocol for LabJack using the LJM Library. Originally developed and tested for LabJack T7-PRO.
Makes use of the LabJack LJM Library Python wrapper. This wrapper needs an installation of the LJM Library for Windows, Mac OS X or Linux. Go to: https://labjack.com/support/software/installers/ljm and https://labjack.com/support/software/examples/ljm/python
-
class
hvl_ccb.comm.labjack_ljm.
LJMCommunication
(configuration)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Communication protocol implementing the LabJack LJM Library Python wrapper.
-
static
config_cls
()[source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
property
is_open
¶ Flag indicating if the communication port is open.
- Returns
True if the port is open, otherwise False
-
read_name
(*names: str, return_num_type: Type[numbers.Real] = <class 'float'>) → Union[numbers.Real, Sequence[numbers.Real]][source]¶ Read one or more input numeric values by name.
- Parameters
names – one or more names to read out from the LabJack
return_num_type – optional numeric type specification for return values; by default float.
- Returns
answer of the LabJack, either single number or multiple numbers in a sequence, respectively, when one or multiple names to read were given
- Raises
TypeError – if read value of type not compatible with return_num_type
-
static
-
class
hvl_ccb.comm.labjack_ljm.
LJMCommunicationConfig
(device_type: Union[str, hvl_ccb._dev.labjack.DeviceType] = 'ANY', connection_type: Union[str, hvl_ccb.comm.labjack_ljm.LJMCommunicationConfig.ConnectionType] = 'ANY', identifier: str = 'ANY')[source]¶ Bases:
object
Configuration dataclass for
LJMCommunication
.-
class
ConnectionType
(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]¶ Bases:
hvl_ccb.utils.enum.AutoNumberNameEnum
LabJack connection type.
-
ANY
= 1¶
-
ETHERNET
= 4¶
-
TCP
= 3¶
-
USB
= 2¶
-
WIFI
= 5¶
-
-
class
DeviceType
(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)¶ Bases:
hvl_ccb.utils.enum.AutoNumberNameEnum
LabJack device types.
Can be also looked up by ambigious Product ID (p_id) or by instance name:
`python LabJackDeviceType(4) is LabJackDeviceType('T4') `
-
ANY
= 1¶
-
T4
= 2¶
-
T7
= 3¶
-
T7_PRO
= 4¶
-
classmethod
get_by_p_id
(p_id: int) → Union[hvl_ccb._dev.labjack.DeviceType, List[hvl_ccb._dev.labjack.DeviceType]]¶ Get LabJack device type instance via LabJack product ID.
Note: Product ID is not unambiguous for LabJack devices.
- Parameters
p_id – Product ID of a LabJack device
- Returns
Instance or list of instances of LabJackDeviceType
- Raises
ValueError – when Product ID is unknown
-
-
connection_type
: Union[str, hvl_ccb.comm.labjack_ljm.LJMCommunicationConfig.ConnectionType] = 'ANY'¶ Can be either string or of enum
ConnectionType
.
-
device_type
: Union[str, hvl_ccb._dev.labjack.DeviceType] = 'ANY'¶ Can be either string ‘ANY’, ‘T7_PRO’, ‘T7’, ‘T4’, or of enum
DeviceType
.
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
identifier
: str = 'ANY'¶ The identifier specifies information for the connection to be used. This can be an IP address, serial number, or device name. See the LabJack docs ( https://labjack.com/support/software/api/ljm/function-reference/ljmopens/identifier-parameter) for more information.
-
is_configdataclass
= True¶
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
class
hvl_ccb.comm.modbus_tcp module¶
Communication protocol for modbus TCP ports. Makes use of the pymodbus library.
-
class
hvl_ccb.comm.modbus_tcp.
ModbusTcpCommunication
(configuration)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Implements the Communication Protocol for modbus TCP.
-
static
config_cls
()[source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
open
() → None[source]¶ Open the Modbus TCP connection.
- Raises
ModbusTcpConnectionFailedException – if the connection fails.
-
read_holding_registers
(address: int, count: int) → List[int][source]¶ Read specified number of register starting with given address and return the values from each register.
- Parameters
address – address of the first register
count – count of registers to read
- Returns
list of int values
-
static
-
class
hvl_ccb.comm.modbus_tcp.
ModbusTcpCommunicationConfig
(host: str, unit: int, port: int = 502)[source]¶ Bases:
object
Configuration dataclass for
ModbusTcpCommunication
.-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
host
: str¶ Host is the IP address of the connected device.
-
is_configdataclass
= True¶
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
port
: int = 502¶ TCP port
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
unit
: int¶ Unit number to be used when connecting with Modbus/TCP. Typically this is used when connecting to a relay having Modbus/RTU-connected devices.
-
hvl_ccb.comm.opc module¶
Communication protocol implementing an OPC UA connection. This protocol is used to interface with the “Supercube” PLC from Siemens.
-
class
hvl_ccb.comm.opc.
OpcUaCommunication
(config)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Communication protocol implementing an OPC UA connection. Makes use of the package python-opcua.
-
static
config_cls
()[source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
init_monitored_nodes
(node_id: Union[object, Iterable], ns_index: int) → None[source]¶ Initialize monitored nodes.
- Parameters
node_id – one or more strings of node IDs; node IDs are always casted via str() method here, hence do not have to be strictly string objects.
ns_index – the namespace index the nodes belong to.
- Raises
OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server
-
property
is_open
¶ Flag indicating if the communication port is open.
- Returns
True if the port is open, otherwise False
-
open
() → None[source]¶ Open the communication to the OPC UA server.
- Raises
OpcUaCommunicationIOError – when communication port cannot be opened.
-
read
(node_id, ns_index)[source]¶ Read a value from a node with id and namespace index.
- Parameters
node_id – the ID of the node to read the value from
ns_index – the namespace index of the node
- Returns
the value of the node object.
- Raises
OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server
-
write
(node_id, ns_index, value) → None[source]¶ Write a value to a node with name
name
.- Parameters
node_id – the id of the node to write the value to.
ns_index – the namespace index of the node.
value – the value to write.
- Raises
OpcUaCommunicationIOError – when protocol was not opened or can’t communicate with a OPC UA server
-
static
-
class
hvl_ccb.comm.opc.
OpcUaCommunicationConfig
(host: str, endpoint_name: str, port: int = 4840, sub_handler: hvl_ccb.comm.opc.OpcUaSubHandler = <hvl_ccb.comm.opc.OpcUaSubHandler object>, update_period: int = 500, wait_timeout_retry_sec: Union[int, float] = 1, max_timeout_retry_nr: int = 5)[source]¶ Bases:
object
Configuration dataclass for OPC UA Communciation.
-
endpoint_name
: str¶ Endpoint of the OPC server, this is a path like ‘OPCUA/SimulationServer’
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
host
: str¶ Hostname or IP-Address of the OPC UA server.
-
is_configdataclass
= True¶
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
max_timeout_retry_nr
: int = 5¶ Maximal number of call re-tries on underlying OPC UA client timeout error
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
port
: int = 4840¶ Port of the OPC UA server to connect to.
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
sub_handler
: hvl_ccb.comm.opc.OpcUaSubHandler = <hvl_ccb.comm.opc.OpcUaSubHandler object>¶ object to use for handling subscriptions.
-
update_period
: int = 500¶ Update period for generating datachange events in OPC UA [milli seconds]
-
wait_timeout_retry_sec
: Union[int, float] = 1¶ Wait time between re-trying calls on underlying OPC UA client timeout error
-
-
exception
hvl_ccb.comm.opc.
OpcUaCommunicationIOError
[source]¶ Bases:
OSError
OPC-UA communication I/O error.
-
exception
hvl_ccb.comm.opc.
OpcUaCommunicationTimeoutError
[source]¶ Bases:
hvl_ccb.comm.opc.OpcUaCommunicationIOError
OPC-UA communication timeout error.
-
class
hvl_ccb.comm.opc.
OpcUaSubHandler
[source]¶ Bases:
object
Base class for subscription handling of OPC events and data change events. Override methods from this class to add own handling capabilities.
To receive events from server for a subscription data_change and event methods are called directly from receiving thread. Do not do expensive, slow or network operation there. Create another thread if you need to do such a thing.
hvl_ccb.comm.serial module¶
Communication protocol for serial ports. Makes use of the pySerial library.
-
class
hvl_ccb.comm.serial.
SerialCommunication
(configuration)[source]¶ Bases:
hvl_ccb.comm.base.AsyncCommunicationProtocol
Implements the Communication Protocol for serial ports.
-
static
config_cls
()[source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
property
is_open
¶ Flag indicating if the serial port is open.
- Returns
True if the serial port is open, otherwise False
-
open
()[source]¶ Open the serial connection.
- Raises
SerialCommunicationIOError – when communication port cannot be opened.
-
read_bytes
() → bytes[source]¶ Read the bytes from the serial port till the terminator is found. The input buffer may hold additional lines afterwards.
This method uses self.access_lock to ensure thread-safety.
- Returns
Bytes read from the serial port; b’’ if there was nothing to read.
- Raises
SerialCommunicationIOError – when communication port is not opened
-
read_single_bytes
(size: int = 1) → bytes[source]¶ Read the specified number of bytes from the serial port. The input buffer may hold additional data afterwards.
- Returns
Bytes read from the serial port; b’’ if there was nothing to read.
-
write_bytes
(data: bytes) → int[source]¶ Write bytes to the serial port.
This method uses self.access_lock to ensure thread-safety.
- Parameters
data – data to write to the serial port
- Returns
number of bytes written
- Raises
SerialCommunicationIOError – when communication port is not opened
-
static
-
class
hvl_ccb.comm.serial.
SerialCommunicationBytesize
(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]¶ Bases:
hvl_ccb.utils.enum.ValueEnum
Serial communication bytesize.
-
EIGHTBITS
= 8¶
-
FIVEBITS
= 5¶
-
SEVENBITS
= 7¶
-
SIXBITS
= 6¶
-
-
class
hvl_ccb.comm.serial.
SerialCommunicationConfig
(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10, port: Optional[str] = None, baudrate: int = 9600, parity: Union[str, hvl_ccb.comm.serial.SerialCommunicationParity] = <SerialCommunicationParity.NONE: 'N'>, stopbits: Union[int, float, hvl_ccb.comm.serial.SerialCommunicationStopbits] = <SerialCommunicationStopbits.ONE: 1>, bytesize: Union[int, hvl_ccb.comm.serial.SerialCommunicationBytesize] = <SerialCommunicationBytesize.EIGHTBITS: 8>, timeout: Union[int, float] = 2)[source]¶ Bases:
hvl_ccb.comm.base.AsyncCommunicationProtocolConfig
Configuration dataclass for
SerialCommunication
.-
Bytesize
¶
-
Parity
¶
-
Stopbits
¶
-
baudrate
: int = 9600¶ Baudrate of the serial port
-
bytesize
: Union[int, hvl_ccb.comm.serial.SerialCommunicationBytesize] = 8¶ Size of a byte, 5 to 8
-
create_serial_port
() → serial.serialposix.Serial[source]¶ Create a serial port instance according to specification in this configuration
- Returns
Closed serial port instance
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
parity
: Union[str, hvl_ccb.comm.serial.SerialCommunicationParity] = 'N'¶ Parity to be used for the connection.
-
port
: Optional[str] = None¶ Port is a string referring to a COM-port (e.g.
'COM3'
) or a URL. The full list of capabilities is found on the pyserial documentation.
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
stopbits
: Union[int, float, hvl_ccb.comm.serial.SerialCommunicationStopbits] = 1¶ Stopbits setting, can be 1, 1.5 or 2.
-
timeout
: Union[int, float] = 2¶ Timeout in seconds for the serial port
-
-
exception
hvl_ccb.comm.serial.
SerialCommunicationIOError
[source]¶ Bases:
OSError
Serial communication related I/O errors.
-
class
hvl_ccb.comm.serial.
SerialCommunicationParity
(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]¶ Bases:
hvl_ccb.utils.enum.ValueEnum
Serial communication parity.
-
EVEN
= 'E'¶
-
MARK
= 'M'¶
-
NAMES
= {'E': 'Even', 'M': 'Mark', 'N': 'None', 'O': 'Odd', 'S': 'Space'}¶
-
NONE
= 'N'¶
-
ODD
= 'O'¶
-
SPACE
= 'S'¶
-
hvl_ccb.comm.telnet module¶
Communication protocol for telnet. Makes use of the telnetlib library.
-
class
hvl_ccb.comm.telnet.
TelnetCommunication
(configuration)[source]¶ Bases:
hvl_ccb.comm.base.AsyncCommunicationProtocol
Implements the Communication Protocol for telnet.
-
static
config_cls
()[source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
property
is_open
¶ Is the connection open?
- Returns
True for an open connection
-
read_bytes
() → bytes[source]¶ Read data as bytes from the telnet connection.
- Returns
data from telnet connection
- Raises
TelnetError – when connection is not open, raises an Error during the communication
-
write_bytes
(data: bytes)[source]¶ Write the data as bytes to the telnet connection.
- Parameters
data – Data to be sent.
- Raises
TelnetError – when connection is not open, raises an Error during the communication
-
static
-
class
hvl_ccb.comm.telnet.
TelnetCommunicationConfig
(terminator: bytes = b'\r\n', encoding: str = 'utf-8', encoding_error_handling: str = 'replace', wait_sec_read_text_nonempty: Union[int, float] = 0.5, default_n_attempts_read_text_nonempty: int = 10, host: Optional[str] = None, port: int = 0, timeout: Union[int, float] = 0.2)[source]¶ Bases:
hvl_ccb.comm.base.AsyncCommunicationProtocolConfig
Configuration dataclass for
TelnetCommunication
.-
create_telnet
() → Optional[telnetlib.Telnet][source]¶ Create a telnet client :return: Opened Telnet object or None if connection is not possible
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
host
: Optional[str] = None¶ Host to connect to can be
localhost
or
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
port
: int = 0¶ Port at which the host is listening
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
timeout
: Union[int, float] = 0.2¶ Timeout for reading a line
-
hvl_ccb.comm.visa module¶
Communication protocol for VISA. Makes use of the pyvisa library. The backend can be NI-Visa or pyvisa-py.
Information on how to install a VISA backend can be found here: https://pyvisa.readthedocs.io/en/master/getting_nivisa.html
So far only TCPIP SOCKET and TCPIP INSTR interfaces are supported.
-
class
hvl_ccb.comm.visa.
VisaCommunication
(configuration)[source]¶ Bases:
hvl_ccb.comm.base.CommunicationProtocol
Implements the Communication Protocol for VISA / SCPI.
-
MULTI_COMMANDS_MAX
= 5¶ The maximum of commands that can be sent in one round is 5 according to the VISA standard.
-
MULTI_COMMANDS_SEPARATOR
= ';'¶ The character to separate two commands is ; according to the VISA standard.
-
WAIT_AFTER_WRITE
= 0.08¶ Small pause in seconds to wait after write operations, allowing devices to really do what we tell them before continuing with further tasks.
-
static
config_cls
() → Type[hvl_ccb.comm.visa.VisaCommunicationConfig][source]¶ Return the default configdataclass class.
- Returns
a reference to the default configdataclass class
-
query
(*commands: str) → Union[str, Tuple[str, …]][source]¶ A combination of write(message) and read.
- Parameters
commands – list of commands
- Returns
list of values
- Raises
VisaCommunicationError – when connection was not started, or when trying to issue too many commands at once.
-
spoll
() → int[source]¶ Execute serial poll on the device. Reads the status byte register STB. This is a fast function that can be executed periodically in a polling fashion.
- Returns
integer representation of the status byte
- Raises
VisaCommunicationError – when connection was not started
-
write
(*commands: str) → None[source]¶ Write commands. No answer is read or expected.
- Parameters
commands – one or more commands to send
- Raises
VisaCommunicationError – when connection was not started
-
-
class
hvl_ccb.comm.visa.
VisaCommunicationConfig
(host: str, interface_type: Union[str, hvl_ccb.comm.visa.VisaCommunicationConfig.InterfaceType], board: int = 0, port: int = 5025, timeout: int = 5000, chunk_size: int = 204800, open_timeout: int = 1000, write_termination: str = '\n', read_termination: str = '\n', visa_backend: str = '')[source]¶ Bases:
object
VisaCommunication configuration dataclass.
-
class
InterfaceType
(value=<object object>, names=None, module=None, type=None, start=1, boundary=None)[source]¶ Bases:
hvl_ccb.utils.enum.AutoNumberNameEnum
Supported VISA Interface types.
-
TCPIP_INSTR
= 2¶ VXI-11 protocol
-
TCPIP_SOCKET
= 1¶ VISA-RAW protocol
-
-
property
address
¶ Address string depending on the VISA protocol’s configuration.
- Returns
address string corresponding to current configuration
-
board
: int = 0¶ Board number is typically 0 and comes from old bus systems.
-
chunk_size
: int = 204800¶ Chunk size is the allocated memory for read operations. The standard is 20kB, and is increased per default here to 200kB. It is specified in bytes.
-
force_value
(fieldname, value)¶ Forces a value to a dataclass field despite the class being frozen.
NOTE: you can define post_force_value method with same signature as this method to do extra processing after value has been forced on fieldname.
- Parameters
fieldname – name of the field
value – value to assign
-
host
: str¶ IP address of the VISA device. DNS names are currently unsupported.
-
interface_type
: Union[str, hvl_ccb.comm.visa.VisaCommunicationConfig.InterfaceType]¶ Interface type of the VISA connection, being one of
InterfaceType
.
-
is_configdataclass
= True¶
-
classmethod
keys
() → Sequence[str]¶ Returns a list of all configdataclass fields key-names.
- Returns
a list of strings containing all keys.
-
open_timeout
: int = 1000¶ Timeout for opening the connection, in milli seconds.
-
classmethod
optional_defaults
() → Dict[str, object]¶ Returns a list of all configdataclass fields, that have a default value assigned and may be optionally specified on instantiation.
- Returns
a list of strings containing all optional keys.
-
port
: int = 5025¶ TCP port, standard is 5025.
-
read_termination
: str = '\n'¶ Read termination character.
-
classmethod
required_keys
() → Sequence[str]¶ Returns a list of all configdataclass fields, that have no default value assigned and need to be specified on instantiation.
- Returns
a list of strings containing all required keys.
-
timeout
: int = 5000¶ Timeout for commands in milli seconds.
-
visa_backend
: str = ''¶ Specifies the path to the library to be used with PyVISA as a backend. Defaults to None, which is NI-VISA (if installed), or pyvisa-py (if NI-VISA is not found). To force the use of pyvisa-py, specify ‘@py’ here.
-
write_termination
: str = '\n'¶ Write termination character.
-
class
Module contents¶
Communication protocols subpackage.