Source code for pydevmgr_core.base.datamodel

try:
    from pydantic.v1 import BaseModel, ValidationError, Field
except ModuleNotFoundError:
    from pydantic import BaseModel, ValidationError, Field
try:
    from pydantic.v1.fields import ModelField
except ModuleNotFoundError:
    from pydantic.fields import ModelField
from .download import download, BaseDataLink, reset
from .upload import upload
from .node import BaseNode
from .model_var import NodeVar, NodeVar_R, NodeVar_W, NodeVar_RW, StaticVar
from .base import BaseData, path as to_path 

from typing import  Any, Iterable, Dict, List, Type

class C:
    ATTR = 'attr'
    ITEM = 'item'
    NODE = 'node'
    PATH = 'path'

class MatchError(ValueError):
    ...

def _extract_static(obj, name, field):        
    if C.ATTR in field.field_info.extra:
        if field.field_info.extra.get(C.ITEM, None) is not None:
            raise ValueError(f'{C.ATTR!r} and {C.ITEM!r} cannot be both set, choose one.')                    
        
        attribute = field.field_info.extra[C.ATTR]
        if attribute:
            try:
                val = getattr(obj, attribute)
            except AttributeError:
                raise MatchError(f'{attribute!r} is not an attribute of {obj.__class__.__name__!r}')
        else:
            val = obj
    
    elif C.ITEM in field.field_info.extra:             
        item = field.field_info.extra[C.ITEM]
        try:
            val = obj[item]
        except KeyError:
            raise MatchError(f'{item!r} is not an item of {obj.__class__.__name__!r}')
    else:
        try:
            val = getattr(obj, name)
        except AttributeError:
            raise MatchError(f'{name!r} is not an attribute of {obj.__class__.__name__!r}')        
    
    return val
        
def _extract_node(obj, name, field):
    """ called when a NodeVar is detected in datamodel """
    
    if C.NODE in field.field_info.extra:
        
        node = field.field_info.extra[C.NODE]
        node = to_path(node)    
        if field.field_info.extra.get(C.ATTR, None) is not None:
            raise MatchError(f'node={C.NODE!r} and attr={C.ATTR!r} cannot be both set, choose one.')
        
        if field.field_info.extra.get(C.PATH, None) is not None:
            raise MatchError(f'node={C.NODE!r} and path={C.PATH!r} cannot be both set, choose one.')
        
        if field.field_info.extra.get(C.ITEM, None) is not None:
            raise MatchError(f'node={C.NODE!r} and item={C.ITEM!r} cannot be both set, choose one.')
                                        
        if isinstance(node, BaseNode.Property):
            _, node = node.new(obj)
        elif isinstance(node, str):
            try:
                attr = node
                node = getattr(obj, attr)
            except AttributeError:
                raise MatchError(f'{attr!r} is not a node in {obj.__class__.__name__!r}')
        elif hasattr(node, "__iter__"):
            
            attr = node
            cobj = obj
            path = tuple(attr)
            for a in path[:-1]:
                cobj = getattr(cobj, a)
            try:
                node = getattr(cobj, path[-1])
            except AttributeError:
                raise MatchError(f'{path[-1]!r} is not a node in {obj.__class__.__name__!r} with path {path}')        
        elif not isinstance(node, BaseNode):
            raise MatchError(f'node set in the field is not a node')
    
    elif C.ATTR in field.field_info.extra:
        if field.field_info.extra.get(C.ITEM, None) is not None:
            raise MatchError(f'attr={C.ATTR!r} and item={C.ITEM!r} cannot be both set, choose one.')
        if field.field_info.extra.get(C.PATH, None) is not None:
            raise MatchError(f'attr={C.ATTR!r} and path={C.PATH!r} cannot be both set, choose one.') 
            
        attr = field.field_info.extra[C.ATTR]  
        if attr:            
            try:
                node = getattr(obj, attr)
            except AttributeError:
                raise MatchError(f'{attr!r} is not a node in {obj.__class__.__name__!r}')
        else:
            node = obj        
            
    elif C.PATH in field.field_info.extra:
        if field.field_info.extra.get(C.ITEM, None) is not None:
            raise MatchError(f'path={C.PATH!r} and item={C.ITEM!r} cannot be both set, choose one.')
             
        path = field.field_info.extra[C.PATH]
        path = to_path(path)    
        if path:
            
            if isinstance(path, str):
                path = path.split('.')                
            elif not hasattr(path, "__iter__"):
                raise MatchError(f"expecting string or iterable for path parameter got {path!r}")
                
            cobj = obj
            path = tuple(path)
            try:
                for a in path[:-1]:
                    cobj = getattr(cobj, a)
                node = getattr(cobj, path[-1])
            except AttributeError:
                raise MatchError(f'{path!r} is not a valid in {obj.__class__.__name__!r} with path {path}')                                    
        else:
            node = obj
             
        if not isinstance(node, BaseNode):
            raise MatchError(f'node attribute  {attr!r} is not a node in {obj.__class__.__name__!r}')
    
    elif C.ITEM in field.field_info.extra:
         
        item = field.field_info.extra[C.ITEM]
        try:
            node = obj[item]
        except (KeyError, TypeError):
            raise MatchError(f'{item!r} is not an item in {obj.__class__.__name__!r}')
                             
        if not isinstance(node, BaseNode):
            raise MatchError(f'node item {item!r} is not a node in {obj.__class__.__name__!r}')    
                                            
    else:
        attr = name
        try:
            node = getattr(obj, attr)
        except AttributeError as e:
            raise MatchError(f'{attr!r} is not a node in {obj.__class__.__name__!r}')         
        if not isinstance(node, BaseNode):
            raise MatchError(f'node attribute {attr!r} is not a node in {obj.__class__.__name__!r}')
         
    return node                                
    


def model_subset(
       class_name: str, 
       Model: Type[BaseModel], 
       fields: List[str], 
       BaseClasses: tuple =(BaseModel,)
    ) -> Type[BaseModel]:
    """ From a pydantic model class create a new model class with a subset of fields 
    
    Args:
        class_name (str): name of the create class model 
        Model (BaseModel): Model root 
        fields (List[str]): list of Model field names 
        BaseClasses (optional, tuple): base classes of the new class default is (BaseModel,)
    """
    annotations = {}
    new_fields = {}
    for field_name in fields:
        field = Model.__fields__[field_name]
        fi = field.field_info
        kwargs = dict(            
            description = fi.description, 
            ge=fi.ge, 
            gt=fi.gt, 
            le=fi.le, 
            lt=fi.lt,
            max_items=fi.max_items, 
            max_length=fi.max_length, 
            min_items=fi.min_items, 
            min_length=fi.min_length, 
            multiple_of=fi.multiple_of, 
            title=fi.title,         
        )
        kwargs.update(fi.extra)
                
        new_field = Field(fi.default, **kwargs)
        annotations[field.name] = field.type_ 
        new_fields[field.name] = new_field
        
    new_class = type(class_name, BaseClasses, new_fields)  
    new_class.__annotations__ = annotations
    return new_class