Source code for research_client.datavalidator.schemas

"""Prototypes to conveniently define data classes with built-in validation."""
from __future__ import annotations
from typing import Union, Optional, Any, Callable
from collections import Counter
from copy import copy, deepcopy
import re
from . import types as dvtypes
from .validation import ValidationResult, Validator
from .exceptions import DataValidationError


DataFieldDescT = Union[
    "DataField",        # Fully instantiated DataField
    dict[str, Any],     # For shorthand constructor notations
]

DataGroupDataT = dict[str, DataFieldDescT]

DataGroupDescT = Union[
    "DataGroup", DataGroupDataT  # Fully instantiated DataGroup # shorthand constructor
]

SchemaDescT = dict[str, Union[DataGroupDescT, DataFieldDescT]]

SchemaDataT = dict[str, Union[dict[str, Any], Any]]

FieldParamSpecT = list[Union[tuple[str, Any, str], tuple[str, Any, str, Any]]]


[docs]class DataField: """Base class for data schema fields. To instantiate, use VField, CField, or shorthand notation in a DataSchema declaration. """ name: str type_: Any typedesc: str required: bool _fieldparams: FieldParamSpecT = [ ("name", str, "The name of the DataField"), ("type_", Any, "The data type for the DataField"), ("typedesc", str, "A user-intelligible description of the data type"), ("required", bool, "Whether the field is required", True), ]
[docs] def __init__(self, name: str, type_: Any, typedesc: str, required: bool = True): """Instantiates a new DataField.""" if "/" in name: raise TypeError( "DataField and DataGroup names may not include the character `/`." ) self.name = name self.type_ = type_ self.typedesc = typedesc self.required = required
[docs] @classmethod def fieldparams(cls) -> FieldParamSpecT: """Returns the parameter list for a DataField of this type.""" return deepcopy(cls._fieldparams)
[docs] def fieldspecs(self) -> dict[str, Any]: """Returns the values for each parameter of the DataField.""" specs: dict[str, Any] = {"fieldtype": self.__class__} for param in self._fieldparams: specs[param[0]] = getattr(self, param[0]) return specs
def __str__(self) -> str: """Returns a (possibly shortened) string representaiton of the DataField.""" label = self.__class__.__name__ return ( f"{label}({repr(self.name)}, {self.type_}, " f"{repr(self.typedesc)}, required={self.required})" ) def __repr__(self) -> str: """Returns a Pythonesque string representation of the DataField.""" return self.__str__()
[docs]class CField(DataField): """Defines a manually validated data field for DataSchema classes.""" vmethod: Union[str, Callable[[Any, Any], Any]] forcecast: Optional[bool] _fieldparams: FieldParamSpecT = [ ("name", str, "The name of the DataField"), ("type_", Any, "The data type for the DataField"), ("typedesc", str, "A user-intelligible description of the data type"), ( "vmethod", Callable[[Any, Any], Any], ( "A callable accepting a two arguments: the first is the type_ of the " "DataField and the second the value. The callable should raise either " "a TypeError or a DataValidationError if the value is invalid, and " "must return a (possibly processed) version of the value it was passed " "which fits the type_ it was passed." ), ), ( "forcecast", Optional[bool], "Whether to force casting of data during validation", None, ), ("required", bool, "Whether the field is required", True), ]
[docs] def __init__( self, name: str, type_: str, typedesc: str, vmethod: Callable[[Any, Any], Any], forcecast: Optional[bool] = None, required: bool = True, ): """Instantiates a new CField.""" super().__init__(name, type_, typedesc, required) self.vmethod = vmethod self.forcecast = forcecast
def __repr__(self) -> str: """Returns a Pythonesque string representation of the CField.""" label = self.__class__.__name__ params = { "name": repr(self.name), "type_": repr(self.type_), "typedesc": repr(self.typedesc), "vmethod": repr(self.vmethod), "forcecast": repr(self.forcecast), "required": repr(self.required), } paramspec = ", ".join(f"{k}={v}" for k, v in params.items()) return f"{label}({paramspec})"
[docs]class VField(DataField): """Defines an auto-validated data field for DataSchema classes.""" constraint: Any forcecast: Optional[bool] ignorecase: Optional[bool] flags: Union[re.RegexFlag, int] _fieldparams: FieldParamSpecT = [ ("name", str, "The name of the DataField"), ("type_", Any, "The data type for the DataField"), ("typedesc", str, "A user-intelligible description of the data type"), ("constraint", Any, "A DataValidator constraint approprite for type_"), ( "forcecast", Optional[bool], "Whether to force casting of data during validation", None, ), ( "ignorecase", Optional[bool], "Whether to ignore case for string-type data validation", None, ), ( "flags", Union[re.RegexFlag, int], "Flags to pass to the regular expression engine if type_ is `str`.", 0, ), ("required", bool, "Whether the field is required", True), ]
[docs] def __init__( self, name: str, type_: str, typedesc: str, constraint: Any, forcecast: Optional[bool] = None, ignorecase: Optional[bool] = None, flags: Union[re.RegexFlag, int] = 0, required: bool = True, ): """Instantiates a new VField.""" super().__init__(name, type_, typedesc, required) self.constraint: Any = constraint self.forcecast: Optional[bool] = forcecast self.ignorecase: Optional[bool] = ignorecase self.flags: Union[re.RegexFlag, int] = flags
def __repr__(self) -> str: """Returns a Pythonesque string representation of the VField.""" label = self.__class__.__name__ params = { "name": repr(self.name), "type_": repr(self.type_), "typedesc": repr(self.typedesc), "constraint": repr(self.constraint), "forcecast": repr(self.forcecast), "ignorecase": repr(self.ignorecase), "flags": repr(self.flags), "required": repr(self.required), } paramspec = ", ".join(f"{k}={v}" for k, v in params.items()) return f"{label}({paramspec})"
[docs]class DataFieldList(DataField): """Defines a field containing an arbitrary number of DataField data."""
[docs]class CFieldList(CField, DataFieldList): """Defines a field containing an arbitrary number of CField data.""" pass
[docs]class VFieldList(VField, DataFieldList): """Defines a field containing an arbitrary number of VField data.""" pass
[docs]class DataGroup: """Defines a data group for DataSchema classes."""
[docs] def __init__(self, name: str, fields: DataGroupDataT): """Instantiates a new DataGroup.""" if "/" in name: raise TypeError( "DataField and DataGroup names may not include the character `/`." ) self.name: str = name self.fields: DataGroupDataT = fields
def __str__(self) -> str: """Returns a (possibly shortened) string representaiton of the DataGroup.""" label = self.__class__.__name__ fieldkeys = list(self.fields.keys()) if len(fieldkeys) > 3: fieldkeys = fieldkeys[:3] fieldkeys.append("...") paramspec = ", ".join(fieldkeys) return f"{label}(name={self.name}, fields=({paramspec}))" def __repr__(self) -> str: """Returns a Pythonesque string representation of the DataGroup.""" label = self.__class__.__name__ paramspec = ", ".join(f"{k}={repr(v)}" for k, v in self.fields.items()) return f"{label}(name={self.name}, {{{paramspec}}})"
[docs] def getfield(self, key: str) -> Union[dict[str, Any], DataField]: """Gets the data field with the name indicated by key.""" if key not in self.fields: raise KeyError(f"Key `{key}` not found in DataGroup `{self.name}`.") return self.fields[key]
[docs]class DataSchema: """Abstract base class to define auto-validating data classes. @TODO: - __getattr__(self, name) - to retreive data - __setattr__(self, name, value) - to set data (with validation) - __delattr__(self, name) - to remove/clear a datapoint - Move some __new__ stuff to __init_subclass__(cls)? - JSON import/export """ forcecast: bool ignorecase: bool __schema: SchemaDescT __schematized: bool __data: SchemaDataT __keys: list[str]
[docs] def __new__(cls, *args: Any, **kwargs: Any): # noqa: C901 """Constructs a new DataSchema instance.""" privateref = f"_{cls.__name__}__schema" if not hasattr(cls, privateref): raise AttributeError( f"Instance of {cls.__name__} must define a class attribute __schema." ) cls.__schema: SchemaDescT = getattr(cls, privateref) privateref = f"_{cls.__name__}__schematized" if hasattr(cls, privateref): cls.__schematized = getattr(cls, privateref) if not hasattr(cls, "__schematized") or not cls.__schematized: for key in cls.__schema: cls.__schema[key] = cls.__schematize(cls.__schema[key], key) cls.__keys = cls.__index() cls.__functionalize() cls.__schematized = True self = super().__new__(cls) privateref = f"_{cls.__name__}__data" if not hasattr(self, privateref): setattr(self, privateref, self.__materialize()) self.__data = getattr(self, privateref) return self
[docs] def __init__(self, forcecast: bool = True, ignorecase: bool = True): """Initialises a new DataSchema object.""" self.forcecast = forcecast self.ignorecase = ignorecase
@classmethod def __schematize( # noqa: C901 cls, schema: Union[DataGroupDescT, DataFieldDescT], schemaname: str ) -> Union[DataGroup, DataField]: # Is it an already instantiated DataField? if isinstance(schema, DataField): # Cannot have any subfields, so just return it.. return schema # Is it an already instantiated DataGroup? if isinstance(schema, DataGroup): # Make sure each of the DataGroup's fields is instantiated for key in schema.fields: if isinstance(schema.fields[key], DataField): pass elif isinstance(schema.fields[key], dict): tmp = cls.__schematize(schema.fields[key], key) if isinstance(tmp, DataGroup): raise RecursionError( "Not allowed to recurse in DataSchema definition." ) schema.fields[key] = tmp elif isinstance(schema.fields[key], DataGroup): raise RecursionError( "Not allowed to recurse in DataSchema definition." ) return schema # Check if shorthand is a data group grouptype = (dict, DataGroup, DataField) # type: ignore if all(isinstance(value, grouptype) for value in schema.values()): # We have a group of items fields: DataGroupDataT = {} for key in schema: tmp = cls.__schematize(schema[key], key) if isinstance(tmp, DataGroup): raise RecursionError( "Not allowed to recurse in DataSchema definition." ) fields[key] = tmp return DataGroup(schemaname, fields) keys = schema.keys() if {"type_", "typedesc", "constraint"} <= keys: # It should be a VField(List) if schema.pop("multiple", False): return VFieldList(schemaname, **schema) return VField(schemaname, **schema) if {"type_", "typedesc", "vmethod"} <= keys: # It should be a CField(List) if schema.pop("multiple", False): return CFieldList(schemaname, **schema) return CField(schemaname, **schema) if {"type_", "typedesc"} <= keys: # A possible vanilla DataField(List) if schema.pop("multiple", False): return DataFieldList(schemaname, **schema) return DataField(schemaname, **schema) raise TypeError( f"Cannot determine field type for `{schemaname}` in DataSchema definition." ) @classmethod def __index(cls) -> list[str]: # noqa: C901 """Creates a flat list of index keys, separating groups and fields with "/".""" indices: list[str] = [] groups: set[str] = set() for field in cls.__schema.values(): if isinstance(field, DataGroup): groups.add(field.name) for subfield in field.fields.values(): if isinstance(subfield, DataField): indices.append(f"{field.name}/{subfield.name}") elif isinstance(field, DataField): indices.append(field.name) if len(set(indices)) != len(indices) or not groups.isdisjoint(indices): message: list[str] = [] duplicates: list[str] = [k for k, v in Counter(indices).items() if v > 1] if duplicates: message.append("The __schema contains the following repeated keys:") message.append("> " + repr(duplicates)) overlaps = groups.intersection(indices) if overlaps: message.append( "The __schema contains the following overlaps " "between group and field keys:" ) message.append("> " + repr(overlaps)) if not message: message.append("You may have found a bug...") raise AttributeError("\n".join(message)) return indices @staticmethod def __setgroupfactory( # noqa: C901 gname: str, fieldspecs: dict[str, dict[str, Any]] ) -> Callable[[DataSchema, dict[str, Any]], None]: def setgroup(self: DataSchema, data: dict[str, Any]) -> None: filtered: dict[str, Any] = {} vr = Validator(forcecast=self.forcecast, ignorecase=self.ignorecase) missingfields: list[str] = [] for key in fieldspecs: if key not in data and fieldspecs[key]["required"]: missingfields.append(key) if missingfields: raise ValueError( f"Data parameter is missing required field(s): {missingfields}." ) for key, value in data.items(): if key not in fieldspecs: raise KeyError(f"The group `{gname}` has no field `{key}`.") fieldspec = fieldspecs[key] if self._isna(value) and fieldspec["required"]: raise ValueError( f"The field `{key}` is required but the passed value is " "None, an empty list, or missing." ) else: fieldtype = fieldspec["fieldtype"] if fieldtype == VField: filtered[key] = self._autovalidate(vr, fieldspec, value).data elif fieldtype == VFieldList: filtered[key] = [ self._autovalidate(vr, fieldspec, v).data for v in value ] elif fieldtype == CField: filtered[key] = self._customvalidate(vr, fieldspec, value).data elif fieldtype == CFieldList: filtered[key] = [ self._customvalidate(vr, fieldspec, v).data for v in value ] elif fieldtype == DataField: filtered[key] = value elif fieldtype == DataFieldList: filtered[key] = list(value) else: raise AttributeError( f"The field `{key}` has unexpected fieldtype `{fieldtype}`." ) vr.raiseif() for key, value in filtered.items(): self.__data[gname][key] = value setgroup.__doc__ = f"Sets data for fields in the group `{gname}`." setgroup.__name__ = f"set{gname}" return setgroup @staticmethod def __delgroupfactory( gname: str, fieldspecs: dict[str, dict[str, Any]] ) -> Callable[[DataSchema], None]: def delgroup(self: DataSchema) -> None: for fieldname, fieldspec in fieldspecs.items(): if fieldspec["fieldtype"] in ( DataFieldList, VFieldList, CFieldList, ): self.__data[gname][fieldname] = [] else: self.__data[gname][fieldname] = None delgroup.__doc__ = f"Deletes data for all fields in the group `{gname}`." delgroup.__name__ = f"del{gname}" return delgroup @staticmethod def __getgroupfactory( gname: str, fieldspecs: dict[str, dict[str, Any]] ) -> Callable[[DataSchema], dict[str, Any]]: def getgroup(self: DataSchema) -> dict[str, Any]: return copy(self.__data[gname]) getgroup.__doc__ = f"Retrieves data for all fields in the group `{gname}`." getgroup.__name__ = f"get{gname}" return getgroup @staticmethod def __setfieldfactory( fieldname: str, fieldspec: dict[str, Any] ) -> Callable[[DataSchema, Any], None]: def setfield(self: DataSchema, value: Any) -> None: vr = Validator(forcecast=self.forcecast, ignorecase=self.ignorecase) if fieldspec["fieldtype"] == VField: data = self._autovalidate(vr, fieldspec, value).data elif fieldspec["fieldtype"] == CField: data = self._customvalidate(vr, fieldspec, value).data else: # Must be plain DataField data = value vr.raiseif() self.__data[fieldname] = data setfield.__doc__ = f"Sets the value for the field `{fieldname}`." setfield.__name__ = f"set{fieldname}" return setfield @staticmethod def __delfieldfactory( fieldname: str, fieldspecs: dict[str, Any] ) -> Callable[[DataSchema], None]: def delfield(self: DataSchema) -> None: self.__data[fieldname] = None delfield.__doc__ = f"Deletes the value of field `{fieldname}`." delfield.__name__ = f"del{fieldname}" return delfield @staticmethod def __getfieldfactory( fieldname: str, fieldspecs: dict[str, Any] ) -> Callable[[DataSchema], Any]: def getfield(self: DataSchema) -> Any: return copy(self.__data[fieldname]) getfield.__doc__ = f"Retrieves the value of field `{fieldname}`." getfield.__name__ = f"get{fieldname}" return getfield @staticmethod def __setfieldlistfactory( fieldname: str, fieldspec: dict[str, Any] ) -> Callable[[DataSchema, list[Any]], None]: def setfieldlist(self: DataSchema, values: list[Any]) -> None: vr = Validator(forcecast=self.forcecast, ignorecase=self.ignorecase) if fieldspec["fieldtype"] == VFieldList: data = [self._autovalidate(vr, fieldspec, v).data for v in values] elif fieldspec["fieldtype"] == CFieldList: data = [self._customvalidate(vr, fieldspec, v).data for v in values] else: # Must be plain DataFieldList data = list(values) vr.raiseif() self.__data[fieldname] = data setfieldlist.__doc__ = f"Sets the values for the field list `{fieldname}`." setfieldlist.__name__ = f"set{fieldname}" return setfieldlist @staticmethod def __delfieldlistfactory( fieldname: str, fieldspec: dict[str, Any] ) -> Callable[[DataSchema], None]: def delfieldlist(self: DataSchema) -> None: self.__data[fieldname] = [] delfieldlist.__doc__ = f"Deletes all values from field list `{fieldname}`." delfieldlist.__name__ = f"del{fieldname}" return delfieldlist @staticmethod def __getfieldlistfactory( fieldname: str, fieldspec: dict[str, Any] ) -> Callable[[DataSchema], list[Any]]: def getfieldlist(self: DataSchema) -> list[Any]: return copy(self.__data[fieldname]) # type: ignore getfieldlist.__doc__ = f"Retrieves all values of the field list `{fieldname}`." getfieldlist.__name__ = f"get{fieldname}" return getfieldlist @classmethod def __functionalize(cls) -> None: # noqa: C901 """Dynamically creates and attaches methods to get/set values.""" groups: dict[str, list[str]] = {} fields: list[str] = [] for key in cls.__keys: if "/" in key: key0, key1 = key.split("/") if key0 not in groups: groups[key0] = [] groups[key0].append(key1) else: fields.append(key) for gname in groups: fieldspecs: dict[str, dict[str, Any]] = cls.__getfieldspecs(gname) setattr(cls, f"set{gname}", DataSchema.__setgroupfactory(gname, fieldspecs)) setattr(cls, f"del{gname}", DataSchema.__delgroupfactory(gname, fieldspecs)) setattr(cls, f"get{gname}", DataSchema.__getgroupfactory(gname, fieldspecs)) for field in fields: fieldspec = cls.__getfieldspecs(field)[field] if fieldspec["fieldtype"] in (VField, CField, DataField): setattr( cls, f"set{field}", DataSchema.__setfieldfactory(field, fieldspec) ) setattr( cls, f"del{field}", DataSchema.__delfieldfactory(field, fieldspec) ) setattr( cls, f"get{field}", DataSchema.__getfieldfactory(field, fieldspec) ) elif fieldspec["fieldtype"] in (VFieldList, CFieldList, DataFieldList): setattr( cls, f"set{field}", DataSchema.__setfieldlistfactory(field, fieldspec), ) setattr( cls, f"del{field}", DataSchema.__delfieldlistfactory(field, fieldspec), ) setattr( cls, f"get{field}", DataSchema.__getfieldlistfactory(field, fieldspec), ) else: raise AttributeError( f"The field `{field}` has unexpected fieldtype " f"`{fieldspec['fieldtype']}`." ) @classmethod def __materialize(cls) -> SchemaDataT: # noqa: C901 """Creates an empty __data store pre-populated with DataGroups/DataFieldLists.""" data: SchemaDataT = {} for field in cls.__schema.values(): if isinstance(field, DataGroup): data[field.name] = {} for subfield in field.fields.values(): if isinstance(subfield, DataFieldList): data[field.name][subfield.name] = [] elif isinstance(subfield, DataField): data[field.name][subfield.name] = None elif isinstance(field, DataFieldList): data[field.name] = [] elif isinstance(field, DataField): data[field.name] = None return data
[docs] def _customvalidate( # noqa: C901 self, vr: Validator, fieldspec: dict[str, Any], value: Any ) -> ValidationResult: """Calls a custom validation method on value and appends result to vr.""" if fieldspec["fieldtype"] not in (CField, CFieldList): raise ValueError("Fieldtype must be CField.") forcecast = fieldspec["forcecast"] if forcecast is None: forcecast = self.forcecast result = value if forcecast: try: result = fieldspec["type_"](result) except Exception: # noqa: S110 pass try: result = fieldspec["vmethod"](fieldspec["type_"], value) success = True except (TypeError, DataValidationError): success = False vresult = ValidationResult( success, fieldspec["type_"], fieldspec["typedesc"], fieldspec["vmethod"], result if forcecast else value, value, forcecast, ) vr.results.append(vresult) if success: vr.successful.append(vresult) else: vr.failed.append(vresult) return vresult
[docs] def _autovalidate( # noqa: C901 self, vr: Validator, fieldspec: dict[str, Any], value: Any ) -> ValidationResult: """Calls the appropriate validation method for fieldspec and value.""" if fieldspec["fieldtype"] not in (VField, VFieldList): raise ValueError("Fieldtype must be VField.") forcecast = fieldspec["forcecast"] if forcecast is None: forcecast = self.forcecast ignorecase = fieldspec["ignorecase"] if ignorecase is None: ignorecase = self.ignorecase if fieldspec["type_"] == str: return vr.vstr( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast, ignorecase, fieldspec["flags"], ) if fieldspec["type_"] == int: return vr.vint( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast ) if fieldspec["type_"] == float: return vr.vfloat( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast ) if fieldspec["type_"] == bool: return vr.vbool( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast ) if fieldspec["type_"] == dvtypes.PolarT: return vr.vpolar( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast, ignorecase, ) if fieldspec["type_"] == dvtypes.EnumT: return vr.venum( fieldspec["typedesc"], fieldspec["constraint"], value, forcecast, ignorecase, ) raise AttributeError( f"Don't know how to auto-validate field of type_ {fieldspec['type_']}. \n" "Permitted type_s: str, int, float, bool, PolarT, EnumT." )
@classmethod def __getfieldspecs(cls, key: str) -> dict[str, dict[str, Any]]: # noqa: C901 """Get the specifications for a single DataField or fields in a DataGroup.""" if key not in cls.__keys and key not in cls.__schema: raise KeyError(f"No field or group with key `{key}`.") if "/" in key: key0, key1 = key.split("/") field = cls.__schema[key0].fields[key1] # type: ignore else: field = cls.__schema[key] if isinstance(field, DataField): return {field.name: field.fieldspecs()} # type: ignore if isinstance(field, DataGroup): fieldspecs: dict[str, dict[str, Any]] = {} for subfield in field.fields.values(): # type: ignore if isinstance(subfield, DataField): fieldspecs[subfield.name] = subfield.fieldspecs() # type: ignore else: raise AttributeError("The __schema appears to be inconsistent.") return fieldspecs raise AttributeError("The __schema appears to be inconsistent.")
[docs] @staticmethod def _isna(value: Any) -> bool: """Returns True if value is either None or [], False otherwise.""" if value is None or value is []: return True return False
[docs] def _setvalue(self, key: str, value: Any) -> None: """Sets the value for the data addressed by key (without validation).""" field = self._getfield(key) if isinstance(field, DataFieldList) and not isinstance(value, list): value = list(value) if isinstance(field, DataField) and "/" in key: key0, key1 = key.split("/") self.__data[key0][key1] = value if isinstance(field, DataField) and key in self.__data: self.__data[key] = value # type: ignore raise KeyError( f"No field matching key `{key}` found in __schema (may be a group)." )
[docs] def _getvalue(self, key: str) -> Any: """Gets the value from the data addressed by key.""" if "/" in key: key0, key1 = key.split("/") if key0 in self.__data and key1 in self.__data[key0]: return self.__data[key0][key1] if key in self.__data: return self.__data[key] raise KeyError( f"No field matching key `{key}` found in __schema (may be a group)." )
[docs] def _getfield(self, key: str) -> Union[DataField, DataGroup]: """Gets the field or group addressed by key.""" if "/" in key: key0, key1 = key.split("/") if ( key0 in self.__schema and isinstance(self.__schema[key0], DataGroup) and key1 in self.__schema[key0].fields # type: ignore ): return self.__schema[key0].fields[key1] # type: ignore (no idea why!?) if key in self.__schema: return self.__schema[key] # type: ignore raise KeyError(f"No group or field matching key `{key}` found in __schema.")
[docs] def keys( self, includemissing: bool = False, onlyrequired: bool = False ) -> list[str]: """Returns a list of keys for the DataSchema.""" return self.__keys.copy()
[docs] def values( self, includemissing: bool = False, onlyrequired: bool = False ) -> list[Any]: """Returns a list of values for the data in the DataSchema.""" values: list[Any] = [] for key in self.__keys: field = self._getfield(key) if isinstance(field, DataField): value = self._getvalue(key) if (not self._isna(value) or includemissing) and ( not onlyrequired or field.required ): values.append(value) return values
[docs] def items( self, includemissing: bool = False, onlyrequired: bool = False ) -> list[tuple[str, Any]]: """Returns a list of key-value pairs for data in the DataSchema.""" items: list[tuple[str, Any]] = [] for key in self.__keys: field = self._getfield(key) if isinstance(field, DataField): value = self._getvalue(key) if (not self._isna(value) or includemissing) and ( not onlyrequired or field.required ): items.append((key, value)) return items
[docs] def data( self, includemissing: bool = False, onlyrequired: bool = False ) -> SchemaDataT: """Returns the data of the DataSchema as a schematic dictionary.""" data: SchemaDataT = {} for key in self.__keys: if "/" in key: key0, key1 = key.split("/") if key0 not in data: data[key0] = {} field = self._getfield(key) value = self._getvalue(key) if ( isinstance(field, DataField) and (not self._isna(value) or includemissing) and (not onlyrequired or field.required) ): data[key0][key1] = deepcopy(value) else: field = self._getfield(key) value = self._getvalue(key) if ( isinstance(field, DataField) and (not self._isna(value) or includemissing) and (not onlyrequired or field.required) ): data[key] = deepcopy(value) return data
[docs] def missing(self, onlyrequired: bool = True) -> list[str]: """Return a list of keys for missing fields.""" missing: list[str] = [] for key in self.__keys: field = self._getfield(key) if isinstance(field, DataField): value = self._getvalue(key) if self._isna(value) and (not onlyrequired or field.required): missing.append(key) return missing
[docs] def iscomplete(self, onlyrequired: bool = True) -> bool: """Checks whether the dataset is complete.""" return not bool(self.missing(onlyrequired))