from typing import Any, Callable, Dict, Iterable, Optional, Union
from .node import NodesReader, DictReadCollector, BaseNode
import time
[docs]class Waiter:
""" Object use to wait for some condition to be True
The only method of this object is :meth:`Waiter.wait`
Args:
node_or_func_list (iterable): an iterable of nodes or callable. Or a single node
waiter.wait will wait until all node.get and callable return True (by default)
logic (string, optional):
"all_true" : stop waiting when all check function return True (default)
"all_false" : stop waiting when all check function return False
"any_true" : stop waiting when one of the check function return True
"any_false" : stop waiting when one of the check function return False
period : float, optional default is 0.1 second
the sleep time in second in between checks
timeout: float, optional second, default is 60
if time since the call of the function exceed timeout an
ValueError is raised.
timeout is in second as well
stop_signal (callable, optional): A callable object returning True to stop the wait. A stop signal will
execute an optional stop_function and then raise a RuntimeError
The example bellow is equivalent to setting a timeout. However timeout argument is kept for
compatibility reason
::
from pydevmgr_core.signals import Timeout
from pydevmgr_core import wait
wait( [... som stuff...], stop_signal=Timeout(60.0) )
stop_function (callable, optional): A callable to be executed in case of stop_signal
data (None, dict, optional): If given input nodes values are taken from the
data dictionary which is expected to be updated in someother place.
lag (float) : lag is a time in second corresponding to a sleep time before starting the wait process
This can be usefull to make sure that an action as started on the server before checking
the nodes
Example:
::
>>> wait_moving = Waiter([motor.stat.is_moving])
>>> wait_moving.wait()
wait return True and can be used as a trigger for a :class:`Download` object :
::
>>> data = {}
>>> pos_update = Downloader(data, [motor.stat.pos_actual], triger=wait_moving.wait)
>>> t = Thread(target=pos_update.runner(0.5))
>>> t.start()
In the exemple above the thread will update the data dictionary
every 0.5 seconds with motor position only when the motor is moving.
Attributes:
period (float): conditions are checks every period seconds
timeout (float): timeout in second, a RuntimeError is raised if conditions
are still false after timeout.
"""
def __init__(self,
node_or_func_list,
logic="all_true",
period=0.1,
timeout=60,
stop_signal = lambda: False,
stop_function = lambda: None,
data=None,
lag=0.0
) -> None:
nodes = []
functions = []
if not hasattr(node_or_func_list, "__iter__"):
node_or_func_list = [node_or_func_list]
for f in node_or_func_list:
if f is None: continue
if isinstance(f, BaseNode):
nodes.append(f)
else:
functions.append(f)
self._functions = functions
try:
self.check_func, self.check_nodes = _logic_loockup[logic]
except KeyError:
raise ValueError("undefined logic %r must be one of %s"%(logic, ",".join(_logic_loockup.keys())))
if data is None:
reader = NodesReader(nodes)
else:
reader = DictReadCollector(data)
for n in nodes:
reader.add(n)
self.nodes = nodes
self.reader = reader
self.period = period
self.timeout = timeout
self.stop_signal = stop_signal
self.stop_function = stop_function
self.lag = lag
[docs] def wait(self):
""" run the wait condition """
check_nodes = self.check_nodes
check_func = self.check_func
reader = self.reader
functions = self._functions
timeout = self.timeout
period = self.period
stop_signal = self.stop_signal
s_time = time.time()
if self.lag>0.0:
time.sleep(self.lag)
data_nodes = {}
def check():
# do the function first if it pass download the node values and check values
q = check_func(functions)
if q:
reader.read(data_nodes)
return check_nodes( [data_nodes[n] for n in self.nodes] )
return False
# do the function first
while not check():
if stop_signal():
self.stop_function()
raise RuntimeError(f"Wait interupted by stop_signal: {stop_signal}")
if (time.time()-s_time)>timeout:
raise RuntimeError('wait timeout')
time.sleep(period)
return True
[docs]def wait(
node_or_func_list: Union[Iterable, BaseNode, Callable],
logic: str ="all_true",
period: float = 0.1,
timeout: float = 60,
stop_signal: Callable = lambda: False,
stop_function: Callable = lambda: None,
lag: float =0.0,
data: Optional[Dict[BaseNode, Any]]=None
) -> None:
""" wait until a list of function return True
Args:
node_or_func_list (iterable): an iterable of nodes of callable. Or a single node
wait will wait until all node.get and callable return True (by default)
logic (str, optional):
"all_true" : stop waiting when all check function return True (default)
"all_false" : stop waiting when all check function return False
"any_true" : stop waiting when one of the check function return True
"any_false" : stop waiting when one of the check function return False
period (float, optional): default is 0.1 second
the sleep time in second in between checks
timeout (float, optional): timeout in second, a RuntimeError is raised if conditions
are still false after timeout
lag (float, optional): Add a flat time lag (in second) before checking nodes.
This could be used to make sure that the last operation has been digested by server.
Bellow the lag is used to make sure that when ``wait`` starts the motor is moving
::
>>> mgr.motor1.move_rel(1.0, 0.25)
>>> wait( [mgr.motor1.stat.is_standstill], lag=0.1 )
stop_signal (callable, optional): A callable object returning True to stop the wait. A stop signal will
raise a RuntimeError
The example bellow is equivalent to setting a timeout. However timeout argument is kept for
compatibility reason
::
from pydevmgr_core.signals import Timeout
from pydevmgr_core import wait
wait( [... som stuff...], stop_signal=Timeout(60.0) )
stop_function (callable, optional): To be executed when a stop_signal is True
data (None, dict, optional): If given input nodes values are taken from the
data dictionary which is expected to be updated in someother place.
Example:
Wait until a motor initialised and an other custom function
::
> def camera_ready():
> # <- do stuff ->
> return True # if camera is ready
> wait( [motor.is_initialised, camera_ready] )
Or something like
::
> is_arrived = lambda : abs(motor.stat.pos_actual.get()-3.4)<0.01
> wait( [is_arrived, camera_ready])
"""
Waiter(node_or_func_list, logic=logic, period=period, timeout=timeout, stop_signal = stop_signal,
stop_function = stop_function, data=data, lag=lag).wait()
def _all_true(functions):
""" all_true(lst) -> return True if all function list return True """
for func in functions:
if not func():
return False
return True
def _all_false(functions):
""" all_false(lst) -> return True if all function in the list return False """
for func in functions:
if func():
return False
return True
def _any_true(functions):
""" any_true(lst) -> return True if one of the function in the list return True """
for func in functions:
if func():
return True
return False
def _any_false(functions):
""" any_false(lst) -> return True if one of the function in the list return False """
for func in functions:
if not func():
return True
return False
""" Used in wait to define the logic applied """
_logic_loockup = {
"all_true" : (_all_true, all),
"all_false" : (_all_false, lambda l: not any(l) ),
"any_true" : (_any_true, any),
"any_false" : (_any_false, lambda l: not all(l))
}