Source code for pydevmgr_elt.base.eltdevice

from . import io
from .tools import    get_enum_txt
from .config import eltconfig

from .eltinterface import EltInterface
from .eltnode import EltNode
from .eltrpc import EltRpc
from .eltstat import StatInterface

try:
    from pydantic.v1 import BaseModel,  AnyUrl,  validator, Field, root_validator
except ModuleNotFoundError:
    from pydantic import BaseModel,  AnyUrl,  validator, Field, root_validator
from pydevmgr_core import (upload, NodeVar, open_device, record_class, GenInterface)
from pydevmgr_core.nodes import Local
from pydevmgr_ua import UaDevice
from typing import Optional



class CtrlConfig(BaseModel):
    # nothing by default to declare here 
    class Config: # BaseModel configuration of pydantic 
        # ignore/allow extra stuff for auto setup
        extra = 'allow'
        validate_assignment = True 
        
    def cfgdict(self):
        return self.dict()
        
class EltDeviceConfig(UaDevice.Config):
    CtrlConfig = CtrlConfig
    Interface = EltInterface.Config
    Node = EltNode.Config
    Rpc = EltRpc.Config

    type: str = "Elt"
    address     : AnyUrl         = Field( default_factory = lambda : eltconfig.default_address)    
    fits_prefix : str            = ""    
    ignored     : bool           = False  
    ctrl_config : CtrlConfig     = CtrlConfig()
    mapfile: Optional[str] = ""
    

    Stat = StatInterface.Config # comes with some default   
    Cfg  = EltInterface.Config
    Rpcs = EltInterface.Config
    

    stat : GenInterface = Stat() # type is a generic interface so the Base EltDevice can almost function with any interfaces
    cfg  : GenInterface = Cfg() 
    Rpcs : GenInterface = Rpcs()
    
    class Config: # BaseModel configuration of pydantic 
        # ignore/allow extra stuff for auto setup
        extra = 'allow'
    

    @root_validator(pre=True)
    def _convert_map_file(cls, values):
        """ when a map file is set it is loaded to replace the defaults suffix of the associated node """
        
        # -------------------------------------------------- 
        # Warning This is realy important to delete the mapfile from the dictionary 
        # otherwhise at each attribute assignment of the model the mapfile will be evaluated ! 
        mapfile = values.pop('mapfile', None)
        # --------------------------------------------------
        
        if mapfile:

            dtype = values.get('type', cls.__fields__['type'].default)

    
            map_d = io.load_map(mapfile)
            try:
                map = map_d[dtype]
            except KeyError:
                raise ValueError("The associated map file does not contain type %r"%dtype)
            for k_i, d_i in map.items():
                if k_i == "rpc":
                    # rpcs = EltInterface.Config.parse_obj({ k:EltRpc.Config(suffix=s ) for k, s in d_i.items()  })
                    rpcs = { k:{'suffix':s}   for k, s in d_i.items() }
                    values['rpcs'] = rpcs
                else:
                    # interface =  EltInterface.Config.parse_obj( { k:EltNode.Config(suffix=s ) for k, s in d_i.items()} )
                    interface = { k:{'suffix':s}   for k, s in d_i.items() }

                    values[k_i] = interface

        return values


    @validator('address', pre=True)
    def _map_host(cls, url):
        """ replace the address on-the-fly if any defined in host_mapping dictionary """
        return eltconfig.host_mapping.get(url, url)
    
    @classmethod 
    def from_cfgdict(cls, config_dict):
        return cls.parse_obj(config_dict) 
    
    def cfgdict(self, exclude=set()):
        if self.version == 'pydevmgr':
            return super().cfgdict()
        
        all_exclude = {*{'version', 'address',  'device_map',  'interface_map', 'node_map', 'rpc_map','ctrl_config'}, *exclude}
        d = super().cfgdict(exclude=all_exclude)
        if 'address' not in exclude:
            d['address'] = str(self.address)
        if 'ctrl_config' not in exclude:
            d['ctrl_config'] = self.ctrl_config.cfgdict()                
        return d


[docs]def open_elt_device(cfgfile, key=None, path=0, prefix=""): """ open a device """ return open_device(cfgfile, key=key, path=path, prefix=prefix)
class RpcInterface(EltInterface): pass class CfgInterface(EltInterface): pass @record_class class EltDevice(UaDevice): Config = EltDeviceConfig class Data(UaDevice.Data): Stat = StatInterface.Data Cfg = CfgInterface.Data stat: Stat = Stat() cfg: Cfg = Cfg() is_ignored: NodeVar[bool] = False Node = EltNode Rpc = EltRpc Interface = EltInterface Stat = StatInterface Cfg = CfgInterface Rpcs = RpcInterface # copy the STATE SUBSTATE here STATE = Stat.STATE SUBSTATE = Stat.SUBSTATE ERROR = Stat.ERROR # stat = StatInterface.prop('stat') # cfg = CfgInterface.prop('cfg') # rpcs = Rpcs.prop('rpcs') @property def rpc(self): # alias for compatibility reason return self.rpcs is_ignored = Local.prop(default=False) _devices = None # some device can have child devices (e.g. ADC) def __init__(self, *args, fits_key: str = "", **kwargs): super().__init__(*args, **kwargs) self.fits_key = fits_key or self._config.fits_prefix self.is_ignored.set( self.config.ignored ) ## These RPC should be on all devices def init(self) -> EltNode: """ init the device Raises: RpcError: if OPC-UA Rpc method returns an error code Returns: is_ready: the :class:`NodeAlias` .stat.is_ready to check if the init is finished Example: :: wait( device.init() ) """ self.rpc.rpcInit.rcall() return self.stat.is_ready def reset(self) -> EltNode: """ reset the device Raises: RpcError: if OPC-UA Rpc method returns an error code Returns: is_not_ready: the :class:`NodeAlias` .stat.is_not_ready to check if the reset was done :: wait( device.reset() ) """ self.rpc.rpcReset.rcall() return self.stat.is_not_ready def enable(self) -> EltNode: """ enable the device Raises: RpcError: if OPC-UA Rpc method returns an error code Returns is_operational: the :class:`NodeAlias` .stat.is_operational to check if device was enabled :: wait( device.enable() ) """ self.rpc.rpcEnable.rcall() return self.stat.is_operational def disable(self) -> EltNode: """ disable the device Raises: RpcError: if OPC-UA Rpc method returns an error code Returns: is_not_operational: the :class:`NodeAlias` .stat.is_not_operational to check if device was disabled :: wait( device.disable() ) """ self.rpc.rpcDisable.rcall() return self.stat.is_not_operational def get_error_txt(self, errcode: int) -> str: """ Get a text description of the given error code number """ return get_enum_txt(self.ERROR, errcode, f"Unknown Error ({errcode})") def get_rpc_error_txt(self, rpc_errcode: int) -> str: """ Get a text description of the given rpc error code number """ return get_enum_txt( self.RpcInterface.RPC_ERROR, rpc_errcode, f"Unknown error ({rpc_errcode})" ) def get_configuration(self, exclude_unset=True, **kwargs): """ return a node/value pair dictionary ready to be uploaded The node/value dictionary represent the device configuration. This is directly use by :func:`Device.configure` method. Args: exclude_unset (optional, bool): Default is True. If True value that was left unset in the config will not be included in the configuration \**kwargs : name/value pairs pointing to self.cfg.<name> node This allow to change configuration on the fly without changing the config file. Exemples :: >>> upload( {**motor1.get_configuration(), **motor2.get_configuration()} ) """ # get values from the ctrl_config Config Model # do not include the default values, if they were unset, the PLC will use the default ones values = self.config.ctrl_config.dict(exclude_none=True, exclude_unset=exclude_unset) cfg_dict = {getattr(self.cfg,k):v for k,v in values.items()} cfg_dict[self.is_ignored] = self.config.ignored cfg_dict.update({ getattr(self.cfg,k):v for k,v in kwargs.items()}) return cfg_dict def configure(self, exclude_unset=True, **kwargs): """ Configure the whole device in the PLC according to what is defined in the config dictionary Quick changes on configuration value can be done by keywords where each key must point to a self.cfg.<name> node. Note that the configuration (as written in file) is always used first before being overwritten by \**kwargs. In other words kwargs are not changing the default configuration in self.config.ctrl_config Args: exclude_unset (optional, bool): Default is True. If True value that was left unset in the config will not be included in the configuration \**kwargs : name/value pairs pointing to cfg.name node This allow to quickly change configuration on the fly without changing the config file. what it does is just: :: >>> upload( self.get_condifuration() ) """ # by default just copy the "ctrl_config" into cfg. This may not work for # all devices and should be customized upload(self.get_configuration(exclude_unset=exclude_unset, **kwargs))