Source code for pydevmgr_core.base.download

from .node import NodesReader, BaseNode
from .base import  _BaseObject


import time
from collections import  OrderedDict

from typing import Any, Dict, Iterable, Union, Optional, Callable


class DataView:
    def __init__(self, 
            data: Dict[BaseNode,Any], 
            prefix: Optional[Union[str, _BaseObject]] = None
          ) -> None:
        self._data = data
        if prefix is None:
            prefix = ""
        if not isinstance(prefix, str):
            prefix = prefix.key 
        
        if not prefix:
            key_lookup = {n.key:n for n in data if hasattr(n, "key") }
        else:                    
            key_lookup = {}
            pref = prefix+"."
            lp = len(pref)
            for n in data:
                if hasattr(n, "key") and n.key.startswith(pref):
                    key_lookup[n.key[lp:]] = n
        
        self._key_lookup = key_lookup    
    
    def __repr__(self):
        return repr({k:self._data[n] for k,n in self._key_lookup.items() })
    
    def __str__(self):
        return str({k:self._data[n] for k,n in self._key_lookup.items() })
        
    def __getitem__(self, item):
        return self._data[self._key_lookup[item]]
    
    def __setitem__(self, item, value):
        self._data[self._key_lookup[item]] = value
    
    def __delitem__(self, item):
        del self._data[self._key_lookup[item]]
    
    def __getattr__(self, attr):
        return self._data[self._key_lookup[attr]]
    
    def __has__(self, item):
        return item in self._key_lookup
    
    def update(self, __d__={}, **kwargs) -> None:
        for k,v in dict(__d__, **kwargs).iteritems():
            self._data[self._key_lookup[k]] = v
    
    def pop(self, item) -> Any:
        """ Pop an item from the root data ! """
        return self._data.pop(self._key_lookup[item])    
    
    def popitem(self, item) -> Any:
        """ Pop an item from the root data ! """
        return self._data.popitem(self._key_lookup[item])  
    
    def keys(self) -> Iterable:
        """ D.keys() ->iterable on D's root keys with matching prefix
        
        Shall be avoided to use in a :class:`Prefixed` object
        """
        return self._key_lookup.__iter__()
    
    def items(self) -> Iterable:
        for k,n in self._key_lookup.items():
            yield k, self._data[n]
    
    def values(self) -> Iterable:
        for k,n in self._key_lookup.items():
            yield self._data[n]    
    
    def clear(self) -> None:
        """ D.clear() -> None.  Remove all items from D root with matching prefix
        
        Shall be avoided to use in a :class:`Prefixed` object
        """        
        pref = self._prefix+"."
        for k, n in list(self._key_lookup.items()):            
            self._data.pop(n)


def _setitem(d,k,v):
    d[k] = v
def _dummy_callback(data):
    pass
def _dummy_trigger():
    return True

class BaseDataLink:
    """ place holder for an instance check """    
    pass


class DownloaderConnection:
    """ Hold a connection to a :class:`Downloader` 
    
    Most likely created by :meth:`Downloader.new_connection` 
    
    Args:
       downloader (:class:`Downloader`) :  parent Downloader instance
       token (Any): Connection token 
    """
    def __init__(self, downloader, token):
        self._downloader = downloader 
        self._token = token 
    
    def _check_connection(self):
        if not self._token:
            raise RuntimeError("DownloaderConnection has been disconnected from its Downloader")
    
    @property
    def data(self) -> dict:
        """ downloader data """
        return self._downloader.data 
           
    def disconnect(self) -> None:
        """ disconnect connection from the downloader 
        
        All nodes related to this connection (and not used by other connection) are removed from the
        the downloader queue. 
        Also all callback associated with this connection will be removed from the downloader 
        
        Note that the disconnected nodes will stay inside the downloader data but will not be updated
        """
        self._downloader.disconnect(self._token)
        self._token = None
        
    def add_node(self, *nodes) -> None:
        """ Register nodes to be downloader associated to this connection 
        
        Args:
            *nodes :  nodes to be added to the download queue
        """ 
        self._check_connection() 
        self._downloader.add_node(self._token, *nodes)
    
    def add_nodes(self, nodes) -> None:
        """ Register nodes to be downloader associated to this connection 
        
        Args:
            nodes :  nodes to be added to the download queue. 
                     If a dictionary of node/value pairs, they are added to the downloader data. 
        """
        self._check_connection() 
        self._downloader.add_nodes(self._token, nodes)
    
    def remove_node(self, *nodes) -> None:
        """ remove  any nodes to the downloader associated to this connection 
        
        Note that the node will stay inside the downloader data but will not be updated 
        
        Args:
            *nodes :  nodes to be removed from the download queue
        """ 
        self._check_connection() 
        self._downloader.remove_node(self._token, *nodes)
    
    def add_datalink(self, *datalinks) -> None:
        """ Register datalinks to the downloader associated to this connection 
        
        Args:
            *datalinks :  :class:`DataLink` to be added to the download queue on the associated downloader
        """
        self._check_connection() 
        self._downloader.add_datalink(self._token, *datalinks)        
    
    def remove_datalink(self, *datalinks) -> None:
        """ Remove any given datalinks to the downloader associated to this connection 
        
        Args:
            *datalinks :  :class:`DataLink` to be removed 
        """
        self._check_connection() 
        self._downloader.remove_datalink(self._token, *datalinks)        
    
    def add_callback(self, *callbacks) -> None:
        """ Register callbacks to be executed after each download of the associated downloader 
        
        Args:            
            *callbacks :  callbacks to be added to the queue of callbacks on the associated downloader       
        """
        self._check_connection() 
        self._downloader.add_callback(self._token, *callbacks)
    
    def remove_callback(self, *callbacks) -> None:
        """ Remove any of given callbacks of the associated downloader 
        
        Args:            
            *callbacks :  callbacks to be remove 
        """
        self._check_connection() 
        self._downloader.remove_callback(self._token, *callbacks)
    
    def add_failure_callback(self, *callbacks) -> None:
        """ Register callbacks to be executed after each download of the associated downloader 
        
        Args:            
            *callbacks :  failure callbacks to be added to the queue of callbacks on the associated downloader       
        """
        self._check_connection() 
        self._downloader.add_failure_callback(self._token, *callbacks)
        
    def remove_failure_callback(self, *callbacks) -> None:
        """ Remove any given callbacks of the associated downloader 
        
        Args:            
            *callbacks :  failure callbacks to be removed 
        """
        self._check_connection() 
        self._downloader.remove_failure_callback(self._token, *callbacks)

class StopDownloader(StopIteration):
    pass

[docs]class Downloader: """ object dedicated to download nodes, feed data and run some callback An application can request nodes to be downloaded and callback to be executed after each success download or each failures. Args: nodes_or_datalink (iterable of node or :class:`DataLink`): - An initial, always downloaded, list of nodes - Or a :class:`DataLink` object data (dict, optional): A dictionary to store the data. If not given, one is created and accessible through the .data attribute. This data is made of node/value pairs, the .get_data_view gives however a dictionary like object with string/value pairs. Each time a new node is added to the downloader it will be added to the data as ``data[node] = None``. None will be replaced after the next download. callback (callable, optional): one single function with signature f(), if given always called after successful download. trigger (callable, optional): a function taking no argument and should return True or False If given the "download" method download nodes only if f() return True. Can be used if the download object is running in a thread for instance. Example: A dumy exemple, replace the print_pos by a GUI interface for instance: :: def print_pos(m, data): "An application" print("Position :", data[m.stat.pos_actual], data[m.stat.pos_error] ) >>> tins = open_manager('tins/tins.yml') >>> tins.connect() >>> downloader = Downloader() >>> token = downloader.new_token() >>> downloader.add_node(token, tins.motor1.stat.pos_actual, tins.motor1.stat.pos_error ) >>> downloader.add_callback(token, lambda : print_pos(tins.motor1, downloader.data)) >>> downloader.download() Position : 3.45 0.003 >>> downloader.data { <UaNode key='motor1.pos_error'>: 0.003, <UaNode key='motor1.pos_actual'>: 3.45 } >>> downloader.disconnect(token) # disconnect the print_pos function and remove # the pos_actual, pos_error node from the list of nodes # to download (except if an other connection use it) Same result can be obtained with this exemple: :: def print_pos(data): "An application" print("Position :", data['pos_actual'], data['pos_error'] ) >>> nodes_data = {tins.motor1.stat.pos_actual: -9.99 , tins.motor1.stat.pos_error: -9.99} >>> m1_data = DataView(nodes_data, tins.motor1.stat) >>> downloader = Downloader(nodes_data, callback=lambda: print_pos(m1_data)) >>> downloader.download() Position : 3.45 0.003 >>> m1_data {'pos_error': 0.003, 'pos_actual': 3.45} """ _did_failed_flag = False Connection = DownloaderConnection StopDownloader = StopDownloader def __init__(self, nodes_or_datalink: Union[Iterable, BaseDataLink] = None, data: Optional[Dict] = None, callback: Optional[Callable] = None, trigger: Optional[Callable] = None ) -> None: if data is None: data = {} self._data = data if nodes_or_datalink is None: nodes = set() datalinks = set() elif isinstance(nodes_or_datalink, BaseDataLink): nodes = set() datalinks = set([nodes_or_datalink]) elif isinstance( nodes_or_datalink, dict): nodes = set() datalinks = set() for k,v in nodes_or_datalink.items(): if isinstance( v, BaseDataLink ): datalinks.add(v) else: nodes.add(v) else: nodes = set() datalinks = set() for v in nodes_or_datalink: if isinstance( v, BaseDataLink ): datalinks.add(v) else: nodes.add(v) #nodes = set() if nodes is None else set(nodes) if callback is None: callbacks = set() else: callbacks = set([callback]) failure_callbacks = set() if callback is None: callback = _dummy_callback if trigger is None: trigger = _dummy_trigger self._trigger = trigger # Ellipsis is here to define general nodes,datalinks,callbacks, ... independent to connection self._dict_nodes = OrderedDict([(Ellipsis,nodes)]) self._dict_datalinks = OrderedDict([(Ellipsis,datalinks)]) self._dict_callbacks = OrderedDict([(Ellipsis,callbacks)]) self._dict_failure_callbacks = OrderedDict([(Ellipsis,failure_callbacks)]) self.trigger = trigger self._next_token = 1 self._rebuild_nodes() self._rebuild_callbacks() self._rebuild_failure_callbacks() def __has__(self, node): return node in self._nodes @property def data(self): return self._data def _rebuild_nodes(self): nodes = set() for nds in self._dict_nodes.values(): nodes.update(nds) for n in nds: self._data.setdefault(n,None) for dls in self._dict_datalinks.values(): for dl in dls: nodes.update(dl.rnodes) for n in dl.rnodes: self._data.setdefault(n,None) self._nodes = nodes self._to_read = NodesReader(nodes) def _rebuild_callbacks(self): callbacks = set() for clbc in self._dict_callbacks.values(): callbacks.update(clbc) self._callbacks = callbacks def _rebuild_failure_callbacks(self): callbacks = set() for clbc in self._dict_failure_callbacks.values(): callbacks.update(clbc) self._failure_callbacks = callbacks
[docs] def new_token(self) -> tuple: """ add a new app connection token Return: A token, the token and type itself is not relevant, it is just a unique ID to be used in add_node, add_callback, add_failure_callback, and disconnect methods .. note:: new_connection() method return object containing a pair of token and downloader and all methods necessary to add_nodes, add_callbacks, etc ... """ token = id(self), self._next_token self._dict_nodes[token] = set() self._dict_datalinks[token] = set() self._dict_callbacks[token] = set() self._dict_failure_callbacks[token] = set() self._next_token += 1 # self._rebuild_nodes() # self._rebuild_callbacks() # self._rebuild_failure_callbacks() return token
[docs] def new_connection(self): """ Return a :class:`DownloaderConnection` object The :class:`DownloaderConnection` object contain a token and the downloader in order to have a standalone object to handle the add/remove of queue nodes and callbacks """ return DownloaderConnection(self, self.new_token())
[docs] def disconnect(self, token: tuple) -> None: """ Disconnect the iddentified connection All the nodes used by the connection (and not by other connected app) will be removed from the download queue of nodes. Also all callback associated with this connection will be removed from the downloader Note that the discnnected nodes will stay inside the downloader data but will not be updated Args: token : a Token returned by :func:`Downloader.new_token` """ if token is Ellipsis: raise ValueError('please provide a real token') try: self._dict_nodes.pop(token) self._dict_datalinks.pop(token) self._dict_callbacks.pop(token) self._dict_failure_callbacks.pop(token) except KeyError: pass self._rebuild_nodes() self._rebuild_callbacks() self._rebuild_failure_callbacks()
[docs] def add_node(self, token: tuple, *nodes) -> None: """ Register node to be downloaded for an iddentified app Args: token: a Token returned by :func:`Downloader.new_token` ``add_node(...,node1, node2)`` can also be used, in this case nodes will be added to the main pool of nodes and cannot be removed from the downloader *nodes : nodes to be added to the download queue, associated to the app """ self._dict_nodes[token].update(nodes) self._rebuild_nodes()
[docs] def add_nodes(self, token: tuple, nodes: Union[dict,Iterable]) -> None: """ Register nodes to be downloaded for an iddentified app Args: token: a Token returned by :func:`Downloader.new_token` ``add_node(...,node1, node2)`` can also be used, in this case nodes will be added to the main pool of nodes and cannot be removed from the downloader nodes (Iterable, dict): nodes to be added to the download queue, associated to the app If a dictionary of node/value pairs, they are added to the downloader data. """ if isinstance(nodes, dict): for node,val in nodes.items(): self._data[node] = val self._dict_nodes[token].update(nodes) self._rebuild_nodes()
[docs] def remove_node(self, token: tuple, *nodes) -> None: """ Remove node from the download queue if the node is not in the queueu nothing is done or raised Note that the node will stay inside the downloader data but will not be updated Args: token: a Token returned by :func:`Downloader.new_token` *nodes : nodes to be removed """ for node in nodes: try: self._dict_nodes[token].remove(node) except KeyError: pass self._rebuild_nodes()
[docs] def add_callback(self, token: tuple, *callbacks) -> None: """ Register callbacks to be executed after each download The callback must have the signature f(), no arguments. Args: token: a Token returned by :func:`Downloader.new_connection` *callbacks : callbacks to be added to the queue of callbacks, associated to the app """ self._dict_callbacks[token].update(callbacks) self._rebuild_callbacks()
[docs] def remove_callback(self, token: tuple, *callbacks) -> None: """ Remove callbacks If the callback is not in the queueu nothing is done or raised Args: token: a Token returned by :func:`Downloader.new_token` *callbacks : callbacks to be removed """ for c in callbacks: try: self._dict_callbacks[token].remove(c) except KeyError: pass self._rebuild_callbacks()
[docs] def add_failure_callback(self, token: tuple, *callbacks) -> None: """ Add one or several callbacks to be executed when a download failed When ever occur a failure (Exception during download) ``f(e)`` is called with ``e`` the exception. If a download is successfull after a failure ``f(None)`` is called one time only. Args: token: a Token returned by :func:`Downloader.new_token` *callbacks: callbacks to be added to the queue of failure callbacks, associated to the app """ self._dict_failure_callbacks[token].update(callbacks) self._rebuild_failure_callbacks()
[docs] def remove_failure_callback(self, token: tuple, *callbacks) -> None: """ remove one or several failure callbacks If the callback is not in the queue nothing is done or raised Args: token: a Token returned by :func:`Downloader.new_token` *callbacks : callbacks to be removed """ for c in callbacks: try: self._dict_failure_callbacks[token].remove(c) except KeyError: pass self._rebuild_failure_callbacks()
[docs] def run(self, period: float =1.0, stop_signal: Callable =lambda : False, sleepfunc: Callable =time.sleep ) -> None: """ run the download indefinitely or when stop_signal return True Args: period (float, optional): period between downloads in second stop_signal (callable, optional): a function returning True to stop the loop or False to continue """ try: while not stop_signal(): s_time = time.time() self.download() sleepfunc( max( period-(time.time()-s_time), 0)) except StopDownloader: # any downloader call back can send a StopDownloader to stop the runner return
[docs] def runner(self, period: float =1.0, stop_signal: Callable =lambda : False, sleepfunc: Callable =time.sleep ) -> Callable: """ Create a function to run the download in a loop Usefull to define a Thread for instance Args: period (float, optional): period between downloads in second stop_signal (callable, optional): a function returning True to stop the loop or False to continue Example: >>> downloader = Downloader([mgr.motor1.substate, mgr.motor1.pos_actual]) >>> t = Thread( target = downloader.runner(period=0.1) ) >>> t.start() """ def run_func(): self.run(period=period, sleepfunc=sleepfunc, stop_signal=stop_signal) return run_func
[docs] def download(self) -> None: """ Execute a download Each nodes on the queue are fetched and the .data dictionary is updated from new values. If the Downloader has a trigger method and the trigger return false, nothing is done """ if not self.trigger(): return try: self._to_read.read(self._data) except Exception as e: if self._failure_callbacks: self._did_failed_flag = True for func in self._failure_callbacks: func(e) else: raise e else: # Populate the data links for dls in self._dict_datalinks.values(): for dl in dls: dl._download_from(self._data) if self._did_failed_flag: self._did_failed_flag = False for func in self._failure_callbacks: func(None) for func in self._callbacks: func()
[docs] def reset(self) -> None: """ All nodes of the downloader with a reset method will be reseted """ reset(self._dict_nodes)
[docs] def get_data_view(self, prefix: str ='') -> DataView: """ Return a view of the data in a dictionary where keys are string keys extracted from nodes If prefix is given the return object will be limited to items with key matching the prefix. Note: the data view reflect any change made on the rootdata except when new nodes (mathing the prefix) are added. So all necessary nodes shall be added to the downloader before requesting a DataView. Args: prefix (str, optional): limit the data viewer to a given prefix. prefix can also be an object with the key attribute like a :class:`BaseDevice`, :class:`BaseNode` etc ... Example: :: > downloader = Downloader([mgr.motor1.substate, mgr.motor1.pos_actual, mgr.motor2.substate]) > downloader.download() > m1_data = downloader.get_data_view(mgr.motor1.key) > m1_data['pos_actual'] 3.9898 :: > m1_data = downloader.get_data_view(mgr.motor1) # is equivalent to > m1_data = DataView(downloader.data, mgr.motor1) """ return DataView(self._data, prefix)
[docs] def clean_data(self) -> None: """ Remove to the .data dictionary all keys/value pairs corresponding to nodes not in the downloader queue Returns: n (int): The number of node/value pair removed """ d = self.data n = 0 for n in list(d): # list(d) in order to avoid deletion on the iterator if not n in self._nodes: d.pop(n, None) n+=1 return n
def reset(nodes: Iterable): """ Execute the reset() method of a list of nodes """ for n in nodes: n.reset()
[docs]def download(nodes, data: Optional[Dict] = None) -> Union[list,None]: """ read node values from remote servers in one call per server Args: nodes (iterable): Iterable of nodes, like [mgr.motor1.stat.pos_actual, mgr.motor2.stat.pos_actual] data (dict, optional): This is mostlikely a dictionary, must define a __setitem__ method If given the function return None and update data in place. If data is None the function return a list of values Returns: None, or list : download(nodes) -> return list of values download(nodes, data) -> return None and update the input data dictionary Example: :: data = {n:n.get() for n in nodes} Is equivalent, but **much slower** than :: data = {} download(nodes) The latest is more efficient because only one call (per server) is done. data dictionary is optional, if not given values are returned in a list: :: pos, error = download([mgr.motor1.stat.pos_actual, mgr.motor1.stat.pos_error]) """ if data is None: data = {} nodes = tuple(nodes) # in case this is a generator NodesReader(nodes).read(data) return [data[n] for n in nodes] else: NodesReader(nodes).read(data) return None