"""
sync API of asyncua
"""
from __future__ import annotations
import asyncio
from datetime import datetime
import functools
import sys
from cryptography import x509
from pathlib import Path
from threading import Thread, Condition
import logging
from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, Tuple, Type, Union, Optional, overload
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from asyncua import ua
from asyncua import client
from asyncua import server
from asyncua import common
from asyncua.common import node, subscription, shortcuts, xmlexporter, type_dictionary_builder
from asyncua.common.events import Event
_logger = logging.getLogger(__name__)
class ThreadLoopNotRunning(Exception):
pass
class ThreadLoop(Thread):
def __init__(self, timeout: Optional[float] = 120) -> None:
Thread.__init__(self)
self.loop = None
self._cond = Condition()
self.timeout = timeout
def start(self):
with self._cond:
Thread.start(self)
self._cond.wait()
def run(self):
self.loop = asyncio.new_event_loop()
_logger.debug("Threadloop: %s", self.loop)
self.loop.call_soon_threadsafe(self._notify_start)
self.loop.run_forever()
def _notify_start(self):
with self._cond:
self._cond.notify_all()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
self.join()
self.loop.close()
def post(self, coro):
if not self.loop or not self.loop.is_running() or not self.is_alive():
raise ThreadLoopNotRunning(f"could not post {coro} since asyncio loop in thread has not been started or has been stopped")
futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return futur.result(self.timeout)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_t, exc_v, trace):
self.stop()
def _to_async(args, kwargs):
args = list(args) # FIXME: might be very inefficient...
for idx, arg in enumerate(args):
if isinstance(arg, (SyncNode, Client, Server)):
args[idx] = arg.aio_obj
elif isinstance(arg, (list, tuple)):
args[idx] = _to_async(arg, {})[0]
for k, v in kwargs.items():
if isinstance(v, SyncNode):
kwargs[k] = v.aio_obj
return args, kwargs
def _to_sync(tloop, result):
if isinstance(result, node.Node):
return SyncNode(tloop, result)
if isinstance(result, (list, tuple)):
return [_to_sync(tloop, item) for item in result]
if isinstance(result, server.event_generator.EventGenerator):
return EventGenerator(tloop, result)
if isinstance(result, subscription.Subscription):
return Subscription(tloop, result)
if isinstance(result, server.Server):
return Server(tloop, result)
return result
def syncmethod(func):
"""
decorator for sync methods
"""
def wrapper(self, *args, **kwargs):
args, kwargs = _to_async(args, kwargs)
aio_func = getattr(self.aio_obj, func.__name__)
result = self.tloop.post(aio_func(*args, **kwargs))
return _to_sync(self.tloop, result)
return wrapper
def sync_wrapper(aio_func):
def wrapper(*args, **kwargs):
if not args:
raise RuntimeError("first argument of function must a ThreadLoop object")
if isinstance(args[0], ThreadLoop):
tloop = args[0]
args = list(args)[1:]
elif hasattr(args[0], "tloop"):
tloop = args[0].tloop
else:
raise RuntimeError("first argument of function must a ThreadLoop object")
args, kwargs = _to_async(args, kwargs)
result = tloop.post(aio_func(*args, **kwargs))
return _to_sync(tloop, result)
return wrapper
def syncfunc(aio_func):
"""
decorator for sync function
"""
def decorator(func, *args, **kwargs):
return sync_wrapper(aio_func)
return decorator
def sync_uaclient_method(aio_func):
"""
Usage:
```python
from asyncua.client.ua_client import UaClient
from asyncua.sync import Client
with Client('otp.tcp://localhost') as client:
read_attributes = sync_uaclient_method(UaClient.read_attributes)(client)
results = read_attributes(...)
...
```
"""
def sync_method(client: 'Client'):
uaclient = client.aio_obj.uaclient
return functools.partial(sync_wrapper(aio_func), client.tloop, uaclient)
return sync_method
def sync_async_client_method(aio_func):
"""
Usage:
```python
from asyncua.client import Client as AsyncClient
from asyncua.sync import Client
with Client('otp.tcp://localhost') as client:
read_attributes = sync_async_client_method(AsyncClient.read_attributes)(client)
results = read_attributes(...)
...
```
"""
def sync_method(client: 'Client'):
return functools.partial(sync_wrapper(aio_func), client.tloop, client)
return sync_method
@syncfunc(aio_func=common.methods.call_method_full)
def call_method_full(parent, methodid, *args):
pass
@syncfunc(aio_func=common.ua_utils.data_type_to_variant_type)
def data_type_to_variant_type(dtype_node):
pass
@syncfunc(aio_func=common.copy_node_util.copy_node)
def copy_node(parent, node, nodeid=None, recursive=True):
pass
@syncfunc(aio_func=common.instantiate_util.instantiate)
def instantiate(parent, node_type, nodeid=None, bname=None, dname=None, idx=0, instantiate_optional=True):
pass
class _SubHandler:
def __init__(self, tloop, sync_handler):
self.tloop = tloop
self.sync_handler = sync_handler
def datachange_notification(self, node, val, data):
self.sync_handler.datachange_notification(SyncNode(self.tloop, node), val, data)
def event_notification(self, event):
self.sync_handler.event_notification(event)
def status_change_notification(self, status: ua.StatusChangeNotification):
self.sync_handler.status_change_notification(status)
class Client:
"""
Sync Client, see doc for async Client
the sync client has one extra parameter: sync_wrapper_timeout.
if no ThreadLoop is provided this timeout is used to define how long the sync wrapper
waits for an async call to return. defualt is 120s and hopefully should fit most applications
"""
def __init__(self, url: str, timeout: int = 4, tloop=None, sync_wrapper_timeout: Optional[float] = 120,) -> None:
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop(sync_wrapper_timeout)
self.tloop.start()
self.close_tloop = True
self.aio_obj = client.Client(url, timeout)
self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
@property
def application_uri(self):
return self.aio_obj.application_uri
@application_uri.setter
def application_uri(self, value):
self.aio_obj.application_uri = value
@syncmethod
def connect(self) -> None:
pass
def disconnect(self) -> None:
try:
self.tloop.post(self.aio_obj.disconnect())
finally:
if self.close_tloop:
self.tloop.stop()
@syncmethod
def connect_sessionless(self) -> None:
pass
def disconnect_sessionless(self) -> None:
try:
self.tloop.post(self.aio_obj.disconnect_sessionless())
finally:
if self.close_tloop:
self.tloop.stop()
@syncmethod
def connect_socket(self) -> None:
pass
def disconnect_socket(self) -> None:
try:
self.aio_obj.disconnect_socket()
finally:
if self.close_tloop:
self.tloop.stop()
def set_user(self, username: str) -> None:
self.aio_obj.set_user(username)
def set_password(self, pwd: str) -> None:
self.aio_obj.set_password(pwd)
def set_locale(self, locale: Sequence[str]) -> None:
self.aio_obj.set_locale(locale)
@syncmethod
def load_private_key(self, path: str, password: Optional[Union[str, bytes]] = None, extension: Optional[str] = None) -> None:
pass
@syncmethod
def load_client_certificate(self, path: str, extension: Optional[str] = None) -> None:
pass
@syncmethod
def load_type_definitions(self, nodes=None):
pass
@syncmethod
def load_data_type_definitions(self, node: Optional[SyncNode] = None, overwrite_existing: bool = False) -> Dict[str, Type]: # type: ignore[empty-body]
pass
@syncmethod
def get_namespace_array(self) -> List[str]: # type: ignore[empty-body]
pass
@syncmethod
def set_security(self) -> None:
pass
@syncmethod
def set_security_string(self, string: str) -> None:
pass
@syncmethod
def load_enums(self) -> Dict[str, Type]: # type: ignore[empty-body]
pass
def create_subscription(self, period: Union[ua.CreateSubscriptionParameters, float], handler: subscription.SubHandler, publishing: bool = True) -> "Subscription":
coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler), publishing)
aio_sub = self.tloop.post(coro)
return Subscription(self.tloop, aio_sub)
def get_subscription_revised_params(self, params: ua.CreateSubscriptionParameters, results: ua.CreateSubscriptionResult) -> Optional[ua.ModifySubscriptionParameters]: # type: ignore
return self.aio_obj.get_subscription_revised_params(params, results)
@syncmethod
def delete_subscriptions(self, subscription_ids: Iterable[int]) -> List[ua.StatusCode]: # type: ignore[empty-body]
pass
@syncmethod
def get_namespace_index(self, uri: str) -> int: # type: ignore[empty-body]
pass
def get_node(self, nodeid: Union[SyncNode, ua.NodeId, str, int]) -> SyncNode:
aio_nodeid = nodeid.aio_obj if isinstance(nodeid, SyncNode) else nodeid
return SyncNode(self.tloop, self.aio_obj.get_node(aio_nodeid))
def get_root_node(self) -> SyncNode:
return SyncNode(self.tloop, self.aio_obj.get_root_node())
def get_objects_node(self) -> SyncNode:
return SyncNode(self.tloop, self.aio_obj.get_objects_node())
def get_server_node(self) -> SyncNode:
return SyncNode(self.tloop, self.aio_obj.get_server_node())
@syncmethod
def connect_and_get_server_endpoints(self) -> List[ua.EndpointDescription]: # type: ignore[empty-body]
pass
@syncmethod
def connect_and_find_servers(self) -> List[ua.ApplicationDescription]: # type: ignore[empty-body]
pass
@syncmethod
def connect_and_find_servers_on_network(self) -> List[ua.FindServersOnNetworkResult]: # type: ignore[empty-body]
pass
[docs]
@syncmethod
def send_hello(self) -> None:
pass
@syncmethod
def open_secure_channel(self, renew=False) -> None:
pass
@syncmethod
def close_secure_channel(self) -> None:
pass
@syncmethod
def get_endpoints(self) -> List[ua.EndpointDescription]: # type: ignore[empty-body]
pass
@syncmethod
def register_server(
self,
server: "Server",
discovery_configuration: Optional[ua.DiscoveryConfiguration] = None,
) -> None:
pass
@syncmethod
def unregister_server(
self,
server: "Server",
discovery_configuration: Optional[ua.DiscoveryConfiguration] = None,
) -> None:
pass
@syncmethod
def find_servers(self, uris: Optional[Iterable[str]] = None) -> List[ua.ApplicationDescription]: # type: ignore[empty-body]
pass
@syncmethod
def find_servers_on_network(self) -> List[ua.FindServersOnNetworkResult]: # type: ignore[empty-body]
pass
@syncmethod
def create_session(self) -> ua.CreateSessionResult: # type: ignore[empty-body]
pass
@syncmethod
def check_connection(self) -> None:
pass
def server_policy_id(self, token_type: ua.UserTokenType, default: str) -> str:
return self.aio_obj.server_policy_id(token_type, default)
def server_policy_uri(self, token_type: ua.UserTokenType) -> str:
return self.aio_obj.server_policy_uri(token_type)
@syncmethod
def activate_session( # type: ignore[empty-body]
self,
username: Optional[str] = None,
password: Optional[str] = None,
certificate: Optional[x509.Certificate] = None,
) -> ua.ActivateSessionResult:
pass
@syncmethod
def close_session(self) -> None:
pass
def get_keepalive_count(self, period: float) -> int:
return self.aio_obj.get_keepalive_count(period)
@syncmethod
def delete_nodes(self, nodes: Iterable[SyncNode], recursive=False) -> Tuple[List[SyncNode], List[ua.StatusCode]]: # type: ignore[empty-body]
pass
@syncmethod
def import_xml(self, path=None, xmlstring=None, strict_mode=True) -> List[ua.NodeId]: # type: ignore[empty-body]
pass
@syncmethod
def export_xml(self, nodes, path, export_values: bool = False) -> None:
pass
@syncmethod
def register_namespace(self, uri: str) -> int: # type: ignore[empty-body]
pass
@syncmethod
def register_nodes(self, nodes: Iterable[SyncNode]) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def unregister_nodes(self, nodes: Iterable[SyncNode]): # type: ignore[empty-body]
pass
@syncmethod
def read_attributes(self, nodes: Iterable[SyncNode], attr: ua.AttributeIds = ua.AttributeIds.Value) -> List[ua.DataValue]: # type: ignore[empty-body]
pass
@syncmethod
def read_values(self, nodes: Iterable[SyncNode]) -> List[Any]: # type: ignore[empty-body]
pass
@syncmethod
def write_values(self, nodes: Iterable[SyncNode], values: Iterable[Any], raise_on_partial_error: bool = True) -> List[ua.StatusCode]: # type: ignore[empty-body]
pass
@syncmethod
def browse_nodes(self, nodes: Iterable[SyncNode]) -> List[Tuple[SyncNode, ua.BrowseResult]]: # type: ignore[empty-body]
pass
@syncmethod
def translate_browsepaths( # type: ignore[empty-body]
self,
starting_node: ua.NodeId,
relative_paths: Iterable[Union[ua.RelativePath, str]]
) -> List[ua.BrowsePathResult]:
pass
def __enter__(self):
try:
self.connect()
except Exception as ex:
self.disconnect()
raise ex
return self
def __exit__(self, exc_type, exc_value, traceback):
self.disconnect()
class Shortcuts:
root: SyncNode
objects: SyncNode
server: SyncNode
base_object_type: SyncNode
base_data_type: SyncNode
base_event_type: SyncNode
base_variable_type: SyncNode
folder_type: SyncNode
enum_data_type: SyncNode
option_set_type: SyncNode
types: SyncNode
data_types: SyncNode
event_types: SyncNode
reference_types: SyncNode
variable_types: SyncNode
object_types: SyncNode
namespace_array: SyncNode
namespaces: SyncNode
opc_binary: SyncNode
base_structure_type: SyncNode
base_union_type: SyncNode
server_state: SyncNode
service_level: SyncNode
HasComponent: SyncNode
HasProperty: SyncNode
Organizes: SyncNode
HasEncoding: SyncNode
def __init__(self, tloop, aio_server):
self.tloop = tloop
self.aio_obj = shortcuts.Shortcuts(aio_server)
for k, v in self.aio_obj.__dict__.items():
setattr(self, k, SyncNode(self.tloop, v))
class Server:
"""
Sync Server, see doc for async Server
the sync server has one extra parameter: sync_wrapper_timeout.
if no ThreadLoop is provided this timeout is used to define how long the sync wrapper
waits for an async call to return. defualt is 120s and hopefully should fit most applications
"""
def __init__(self, shelf_file: Optional[Path] = None, tloop=None, sync_wrapper_timeout: Optional[float] = 120,):
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop(timeout=sync_wrapper_timeout)
self.tloop.start()
self.close_tloop = True
self.aio_obj = server.Server()
self.tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
@syncmethod
def load_certificate(self, path: str, format: str = None):
pass
@syncmethod
def load_private_key(self, path, password=None, format=None):
pass
def set_endpoint(self, url):
return self.aio_obj.set_endpoint(url)
def set_server_name(self, name):
return self.aio_obj.set_server_name(name)
def set_security_policy(self, security_policy, permission_ruleset=None):
return self.aio_obj.set_security_policy(security_policy, permission_ruleset)
def set_security_IDs(self, policy_ids):
return self.aio_obj.set_security_IDs(policy_ids)
def disable_clock(self, val: bool = True):
return self.aio_obj.disable_clock(val)
@syncmethod
def register_namespace(self, url):
pass
@syncmethod
def get_namespace_array(self):
pass
@syncmethod
def start(self):
pass
def stop(self):
self.tloop.post(self.aio_obj.stop())
if self.close_tloop:
self.tloop.stop()
def link_method(self, node, callback):
return self.aio_obj.link_method(node, callback)
@syncmethod
def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
pass
def get_node(self, nodeid):
return SyncNode(self.tloop, self.aio_obj.get_node(nodeid))
@syncmethod
def import_xml(self, path=None, xmlstring=None, strict_mode=True):
pass
@syncmethod
def get_namespace_index(self, url):
pass
@syncmethod
def load_enums(self):
pass
@syncmethod
def load_type_definitions(self):
pass
@syncmethod
def load_data_type_definitions(self, node=None):
pass
@syncmethod
def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
pass
def set_attribute_value_callback(
self,
nodeid: ua.NodeId,
callback: Callable[[ua.NodeId, ua.AttributeIds], ua.DataValue],
attr=ua.AttributeIds.Value
) -> None:
self.aio_obj.set_attribute_value_callback(nodeid, callback, attr)
def create_subscription(self, period, handler):
coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
aio_sub = self.tloop.post(coro)
return Subscription(self.tloop, aio_sub)
class EventGenerator:
def __init__(self, tloop, aio_evgen):
self.aio_obj = aio_evgen
self.tloop = tloop
@property
def event(self):
return self.aio_obj.event
def trigger(self, time=None, message=None):
return self.tloop.post(self.aio_obj.trigger(time, message))
def new_node(sync_node, nodeid):
"""
given a sync node, create a new SyncNode with the given nodeid
"""
return SyncNode(sync_node.tloop, node.Node(sync_node.aio_obj.session, nodeid))
class SyncNode:
def __init__(self, tloop: ThreadLoop, aio_node: node.Node):
self.aio_obj = aio_node
self.tloop = tloop
def __eq__(self, other):
return other is not None and self.aio_obj == other.aio_obj
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.aio_obj.__str__()
def __repr__(self):
return "Sync" + self.aio_obj.__repr__()
def __hash__(self):
return self.aio_obj.__hash__()
def __get_nodeid(self):
return self.aio_obj.nodeid
def __set_nodeid(self, value):
self.aio_obj.nodeid = value
nodeid: ua.NodeId = property(__get_nodeid, __set_nodeid)
@syncmethod
def read_type_definition(self) -> Optional[ua.NodeId]: # type: ignore[empty-body]
pass
@syncmethod
def get_parent(self) -> Optional[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def read_node_class(self) -> ua.NodeClass: # type: ignore[empty-body]
pass
@syncmethod
def read_attribute( # type: ignore[empty-body]
self,
attr: ua.AttributeIds,
indexrange: Optional[str] = None,
raise_on_bad_status: bool = True,
) -> ua.DataValue:
pass
@syncmethod
def write_attribute(
self,
attributeid: ua.AttributeIds,
datavalue: ua.DataValue,
indexrange: Optional[str] = None,
) -> None:
pass
@syncmethod
def read_browse_name(self) -> ua.QualifiedName: # type: ignore[empty-body]
pass
@syncmethod
def read_display_name(self) -> ua.LocalizedText: # type: ignore[empty-body]
pass
@syncmethod
def read_data_type(self) -> ua.NodeId: # type: ignore[empty-body]
pass
@syncmethod
def read_array_dimensions(self) -> int: # type: ignore[empty-body]
pass
@syncmethod
def read_value_rank(self) -> int: # type: ignore[empty-body]
pass
@syncmethod
def delete(self, delete_references: bool = True, recursive: bool = False) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def get_children( # type: ignore[empty-body]
self,
refs: int = ua.ObjectIds.HierarchicalReferences,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
) -> List[SyncNode]:
pass
@syncmethod
def get_properties(self) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def get_children_descriptions( # type: ignore[empty-body]
self,
refs: int = ua.ObjectIds.HierarchicalReferences,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
result_mask: ua.BrowseResultMask = ua.BrowseResultMask.All,
) -> List[ua.ReferenceDescription]:
pass
@syncmethod
def get_user_access_level(self) -> Set[ua.AccessLevel]: # type: ignore[empty-body]
pass
@overload
def get_child(
self,
path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]],
return_all: Literal[False] = False,
) -> SyncNode:
...
@overload
def get_child(
self,
path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]],
return_all: Literal[True] = True,
) -> List[SyncNode]:
...
@syncmethod
def get_child( # type: ignore[empty-body]
self,
path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]],
return_all: bool = False,
) -> Union[SyncNode, List[SyncNode]]:
pass
@syncmethod
def get_children_by_path( # type: ignore[empty-body]
self,
paths: Iterable[Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]]],
raise_on_partial_error: bool = True
) -> List[List[Optional[SyncNode]]]:
pass
@syncmethod
def read_raw_history( # type: ignore[empty-body]
self,
starttime: Optional[datetime] = None,
endtime: Optional[datetime] = None,
numvalues: int = 0,
return_bounds: bool = True,
) -> List[ua.DataValue]:
pass
@syncmethod
def history_read( # type: ignore[empty-body]
self,
details: ua.ReadRawModifiedDetails,
continuation_point: Optional[bytes] = None,
) -> ua.HistoryReadResult:
pass
@syncmethod
def read_event_history( # type: ignore[empty-body]
self,
starttime: datetime = None,
endtime: datetime = None,
numvalues: int = 0,
evtypes: Union[SyncNode, ua.NodeId, str, int, Iterable[Union[SyncNode, ua.NodeId, str, int]]] = ua.ObjectIds.BaseEventType,
) -> List[Event]:
pass
@syncmethod
def history_read_events(self, details: Iterable[ua.ReadEventDetails]) -> ua.HistoryReadResult: # type: ignore[empty-body]
pass
@syncmethod
def set_modelling_rule(self, mandatory: bool) -> None:
pass
@syncmethod
def add_variable( # type: ignore[empty-body]
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> SyncNode:
pass
@syncmethod
def add_property( # type: ignore[empty-body]
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> SyncNode:
pass
@syncmethod
def add_object( # type: ignore[empty-body]
self, nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
objecttype: Optional[int] = None,
instantiate_optional: bool = True,
) -> SyncNode:
pass
@syncmethod
def add_object_type(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> SyncNode: # type: ignore[empty-body]
pass
@syncmethod
def add_variable_type( # type: ignore[empty-body]
self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], datatype: Union[ua.NodeId, int]
) -> SyncNode:
pass
@syncmethod
def add_folder(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> SyncNode: # type: ignore[empty-body]
pass
@syncmethod
def add_method(self, *args) -> SyncNode: # type: ignore[empty-body]
pass
@syncmethod
def add_data_type( # type: ignore[empty-body]
self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], description: Optional[str] = None
) -> SyncNode:
pass
@syncmethod
def set_writable(self, writable: bool = True) -> None:
pass
@syncmethod
def write_value(self, value: Any, varianttype: Optional[ua.VariantType] = None) -> None:
pass
set_value = write_value # legacy
@syncmethod
def write_params(self, params: ua.WriteParameters) -> List[ua.StatusCode]: # type: ignore[empty-body]
pass
@syncmethod
def read_params(self, params: ua.ReadParameters) -> List[ua.DataValue]: # type: ignore[empty-body]
pass
@syncmethod
def read_value(self) -> Any:
pass
get_value = read_value # legacy
@syncmethod
def read_data_value(self, raise_on_bad_status: bool = True) -> ua.DataValue: # type: ignore[empty-body]
pass
get_data_value = read_data_value # legacy
@syncmethod
def read_data_type_as_variant_type(self) -> ua.VariantType: # type: ignore[empty-body]
pass
get_data_type_as_variant_type = read_data_type_as_variant_type # legacy
@syncmethod
def call_method(self, methodid: Union[ua.NodeId, ua.QualifiedName, str], *args) -> Any: # type: ignore[empty-body]
pass
@syncmethod
def get_references( # type: ignore[empty-body]
self,
refs: int = ua.ObjectIds.References,
direction: ua.BrowseDirection = ua.BrowseDirection.Both,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
result_mask: ua.BrowseResultMask = ua.BrowseResultMask.All,
) -> List[ua.ReferenceDescription]:
pass
@syncmethod
def add_reference(
self, target: Union[SyncNode, ua.NodeId, str, int], reftype: int, forward: bool = True, bidirectional: bool = True
) -> None:
pass
@syncmethod
def read_description(self) -> ua.LocalizedText: # type: ignore[empty-body]
pass
@syncmethod
def get_variables(self) -> List[SyncNode]: # type: ignore[empty-body]
pass
@overload
def get_path(self, max_length: int = 20, as_string: Literal[False] = False) -> List[SyncNode]:
...
@overload
def get_path(self, max_length: int = 20, as_string: Literal[True] = True) -> List["str"]:
...
@syncmethod
def get_path(self, max_length: int = 20, as_string: bool = False) -> Union[List[SyncNode], List[str]]: # type: ignore[empty-body]
pass
@syncmethod
def read_attributes(self, attrs: Iterable[ua.AttributeIds]) -> List[ua.DataValue]: # type: ignore[empty-body]
pass
@syncmethod
def add_reference_type( # type: ignore[empty-body]
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
symmetric: bool = True,
inversename: Optional[str] = None,
) -> SyncNode:
pass
@syncmethod
def delete_reference( # type: ignore[empty-body]
self,
target: Union[SyncNode, ua.NodeId, str, int],
reftype: int,
forward: bool = True,
bidirectional: bool = True,
) -> None:
pass
@syncmethod
def get_access_level(self) -> Set[ua.AccessLevel]: # type: ignore[empty-body]
pass
@syncmethod
def get_description_refs(self) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def get_encoding_refs(self) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def get_methods(self) -> List[SyncNode]: # type: ignore[empty-body]
pass
@syncmethod
def get_referenced_nodes( # type: ignore[empty-body]
self,
refs: int = ua.ObjectIds.References,
direction: ua.BrowseDirection = ua.BrowseDirection.Both,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
) -> List[SyncNode]:
pass
@syncmethod
def read_data_type_definition(self) -> ua.DataTypeDefinition: # type: ignore[empty-body]
pass
@syncmethod
def read_event_notifier(self) -> Set[ua.EventNotifier]: # type: ignore[empty-body]
pass
@syncmethod
def register(self) -> None:
pass
@syncmethod
def set_attr_bit(self, attr: ua.AttributeIds, bit: int) -> None:
pass
@syncmethod
def set_event_notifier(self, values) -> None:
pass
@syncmethod
def unregister(self) -> None:
pass
@syncmethod
def unset_attr_bit(self, attr: ua.AttributeIds, bit: int) -> None:
pass
@syncmethod
def write_array_dimensions(self, value: int) -> None:
pass
@syncmethod
def write_data_type_definition(self, sdef: ua.DataTypeDefinition) -> None:
pass
@syncmethod
def write_value_rank(self, value: int) -> None:
pass
class Subscription:
def __init__(self, tloop, sub):
self.tloop = tloop
self.aio_obj = sub
@syncmethod
def subscribe_data_change(self, nodes,
attr=ua.AttributeIds.Value,
queuesize=0,
monitoring=ua.MonitoringMode.Reporting,
sampling_interval=0.0):
pass
@syncmethod
def subscribe_events(
self,
sourcenode=ua.ObjectIds.Server,
evtypes=ua.ObjectIds.BaseEventType,
evfilter=None,
queuesize=0,
):
pass
def _make_monitored_item_request(self, node: SyncNode, attr, mfilter, queuesize, monitoring=ua.MonitoringMode.Reporting,) -> ua.MonitoredItemCreateRequest:
return self.aio_obj._make_monitored_item_request(node, attr, mfilter, queuesize, monitoring)
@syncmethod
def unsubscribe(self, handle):
pass
@syncmethod
def create_monitored_items(self, monitored_items):
pass
@syncmethod
def delete(self):
pass
class XmlExporter:
def __init__(self, sync_server):
self.tloop = sync_server.tloop
self.aio_obj = xmlexporter.XmlExporter(sync_server.aio_obj)
@syncmethod
def build_etree(self, node_list, uris=None):
pass
@syncmethod
def write_xml(self, xmlpath: Path, pretty=True):
pass
class DataTypeDictionaryBuilder:
def __init__(self, server, idx, ns_urn, dict_name, dict_node_id=None):
self.tloop = server.tloop
self.aio_obj = type_dictionary_builder.DataTypeDictionaryBuilder(server.aio_obj, idx, ns_urn, dict_name, dict_node_id)
self.init()
@property
def dict_id(self):
return self.aio_obj.dict_id
@syncmethod
def init(self):
pass
@syncmethod
def create_data_type(self, type_name, nodeid=None, init=True):
pass
@syncmethod
def set_dict_byte_string(self):
pass
def new_struct_field(
name: str,
dtype: Union[ua.NodeId, SyncNode, ua.VariantType],
array: bool = False,
optional: bool = False,
description: str = "",
) -> ua.StructureField:
if isinstance(dtype, SyncNode):
dtype = dtype.aio_obj
return common.structures104.new_struct_field(name, dtype, array, optional, description)
@syncfunc(aio_func=common.structures104.new_enum)
def new_enum( # type: ignore[empty-body]
server: Union["Server", "Client"],
idx: Union[int, ua.NodeId],
name: Union[int, ua.QualifiedName],
values: List[str],
optional: bool = False
) -> SyncNode:
pass
@syncfunc(aio_func=common.structures104.new_struct)
def new_struct( # type: ignore[empty-body]
server: Union["Server", "Client"],
idx: Union[int, ua.NodeId],
name: Union[int, ua.QualifiedName],
fields: List[ua.StructureField],
) -> Tuple[SyncNode, List[SyncNode]]:
pass