Source code for glorpen.config.config

# from glorpen.config.fields import path_validation_error
import abc
import dataclasses
import itertools
import textwrap
import types
import typing

_NoneType = types.NoneType if hasattr(types, "NoneType") else type(None)


class ConfigType(abc.ABC):
    def __init__(self, config):
        super(ConfigType, self).__init__()
        self.config = config

    @abc.abstractmethod
    def as_model(self, data: typing.Any, type, args: typing.Tuple, metadata: dict, path: str):
        pass


ValueErrorItems = typing.Union[dict, typing.Sequence]


class ConfigValueError(ValueError):
    def __init__(self, error):
        super(ConfigValueError, self).__init__(f"Found validation errors:\n{error}")


class DictValueError(ValueError):
    def __init__(self, items: ValueErrorItems):
        # msg = "Invalid fields.\n" + self._format_row(items)
        msg = self._format_row(items)
        super(DictValueError, self).__init__(msg)

    def _format_row(self, items: ValueErrorItems):
        return textwrap.indent("\n".join(self._format_items(items)), "")

    def _format_items(self, items: ValueErrorItems):
        if hasattr(items, "keys"):
            key_max_len = max(len(str(i)) for i in items.keys())
            item_sets = items.items()
            key_suffix = ": "
        else:
            key_max_len = 1
            item_sets = [("-", v) for v in items]
            key_suffix = " "

        msg_offset = key_max_len + len(key_suffix)

        for k, e in item_sets:
            f_key = str(k).rjust(key_max_len)
            f_msg = textwrap.indent(str(e), " " * msg_offset)[msg_offset:]
            yield f"{f_key}{key_suffix}{f_msg}"


[docs]class Config: """Config validator and normalizer.""" _registered_types: typing.List[ConfigType] def __init__(self): super(Config, self).__init__() self._registered_types = [] @classmethod def _handle_optional_values(cls, type, default_factory): if (type is _NoneType) or (typing.get_origin(type) is typing.Union and _NoneType in typing.get_args(type)): return None if default_factory: return default_factory() raise ValueError("No value provided") def as_model(self, data: typing.Any, type, metadata=None, default_factory=None, path=""): if data is None: return self._handle_optional_values(type, default_factory) if dataclasses.is_dataclass(type): return self._from_dataclass(data, type, path=path) else: origin = typing.get_origin(type) if origin is None: return self._from_type(data=data, type=type, args=(), metadata=metadata, path=path) else: return self._from_type(data=data, type=origin, args=typing.get_args(type), metadata=metadata, path=path) def to_model(self, data, cls): try: return self.as_model(data, cls) except ValueError as e: raise ConfigValueError(e) from None @classmethod def _get_default_factory(cls, field: dataclasses.Field): if field.default is not dataclasses.MISSING: return lambda: field.default elif field.default_factory is not dataclasses.MISSING: return field.default_factory else: return None def _from_dataclass(self, data: typing.Dict, cls, path: str): kwargs = {} errors = {} for field in dataclasses.fields(cls): try: kwargs[field.name] = self.as_model(data.get(field.name), field.type, metadata=field.metadata, default_factory=self._get_default_factory(field), path=f"{path}.{field.name}") except ValueError as e: errors[field.name] = e if errors: raise DictValueError(errors) return cls(**kwargs) def _from_type(self, data: typing.Any, type, args: typing.Tuple, metadata: dict, path: str): for reg_type in self._registered_types: value = reg_type.as_model(data=data, type=type, args=args, metadata=metadata, path=path) if value is not None: return value raise ValueError(f"Could not convert to {type}") def register(self, type: ConfigType): self._registered_types.append(type(self))
class UnionType(ConfigType): def as_model(self, data: typing.Any, type, args: typing.Tuple, metadata: dict, path: str): if type is typing.Union: return self._try_each_type(data, args, metadata=metadata, path=path) def _try_each_type(self, data, types, path: str, metadata=None): errors = [] for tp in types: try: return self.config.as_model(data, tp, metadata=metadata, path=path) except ValueError as e: errors.append(e) raise DictValueError(errors) class SimpleTypes(ConfigType): @classmethod def _try_convert(cls, data, conv): try: return conv(data) except Exception as e: raise ValueError(e) def as_model(self, data: typing.Any, type, args: typing.Tuple, metadata: dict, path: str): if type in (int, str, bool, float): return self._try_convert(data, type) class SequenceTypes(ConfigType): def as_model(self, data: typing.Any, type, args: typing.Tuple, metadata: dict, path: str): if type is tuple: errors = {} ret = [] for index, (tp, value) in enumerate(itertools.zip_longest(args, data)): try: ret.append(self.config.as_model(value, tp, path=f"{path}.{index}")) except Exception as e: errors[index] = e if errors: raise DictValueError(errors) return tuple(ret) def default(): c = Config() c.register(UnionType) c.register(SimpleTypes) c.register(SequenceTypes) return c