from contextlib import contextmanager
from enum import Enum
from functools import partial
import inspect
import io
import logging
import os
import re
from typing import (
cast,
Any,
Callable,
Dict,
Generic,
List,
Mapping,
TextIO,
Type,
TypeVar,
Union,
)
import attr
from schematics import Model, types
import schematics.exceptions
import toml
from toml.encoder import _dump_str
_LOG = logging.getLogger(__name__)
_METADATA_KEY = 'csbot_config'
[docs]class Config(Model):
"""Base class for configuration schemas.
Use :func:`option`, :func:`option_list` and :func:`option_map` to create fields in the schema.
Schemas are also valid option types, so deeper structures can be defined.
>>> class MyConfig(Config):
... delay = option(float, default=0.5, help="Number of seconds to wait")
... notify = option_list(str, help="Users to notify")
"""
def __repr__(self):
return f"{self.__class__.__name__}({', '.join(f'{a.name}={repr(a.value)}' for a in self.atoms())})"
#: Raised when configuration fails to validate
ConfigError = schematics.exceptions.DataError
_example_mode = False
[docs]@contextmanager
def example_mode():
"""For the duration of this context manager, try to use example values before default values."""
global _example_mode
old = _example_mode
_example_mode = True
yield
_example_mode = old
[docs]class WordList(types.ListType):
"""A list of strings that also accepts a space-separated string instead."""
def __init__(self, min_size=None, max_size=None, **kwargs):
super().__init__(types.StringType, min_size, max_size, **kwargs)
[docs] def convert(self, value, context=None):
if isinstance(value, str):
value = value.split()
return super().convert(value, context)
_T = TypeVar("_T")
# Mapping of Python types to Schematics field types
_TYPE_MAP = {
str: types.StringType,
int: types.IntType,
float: types.FloatType,
bool: types.BooleanType,
WordList: WordList,
}
# Basic option types available to the developer
_B = TypeVar("_B", Config, str, int, float, bool, WordList)
# Type of default value for an option
_DefaultValue = Union[None, _T]
# Type of callable to create a default value for an option
_DefaultCall = Callable[[], _DefaultValue[_T]]
# Type of a "default" or "example" argument
_DefaultArg = Union[_DefaultValue[_T], _DefaultCall[_T]]
[docs]def is_config(obj: Any) -> bool:
"""Is *obj* a configuration class or instance?"""
if inspect.isclass(obj):
return issubclass(obj, Config)
else:
return isinstance(obj, Config)
[docs]def is_allowable_type(cls: Type) -> bool:
"""Is *cls* allowed as a configuration option type?"""
return cls in _TYPE_MAP or is_config(cls)
[docs]def structure(data: Mapping[str, Any], cls: Type[Config]) -> Config:
"""Create an instance of *cls* from plain Python structure *data*."""
o = cls(data)
o.validate()
return o
[docs]def unstructure(obj: Config) -> Mapping[str, Any]:
"""Get plain Python structured data from *obj*."""
return obj.to_native()
[docs]def loads(s: str, cls: Type[Config]) -> Config:
"""Create an instance of *cls* from the TOML in *s*."""
return structure(toml.loads(s), cls)
[docs]def dumps(obj: Config) -> str:
"""Get TOML string representation of *obj*."""
return toml.dumps(unstructure(obj))
[docs]def load(f: TextIO, cls: Type[Config]) -> Config:
"""Create an instance of *cls* from the TOML in *f*."""
return structure(toml.load(f), cls)
[docs]def dump(obj: Config, f: TextIO):
"""Write TOML representation of *obj* to *f*."""
return toml.dump(unstructure(obj), f)
class _Default(Generic[_T]):
"""A callable to get a default or example value.
Both *default* and *example* can be either a value or a callable that returns a value.
Returns *default* (or the result of calling it, if callable) when called.
When called "normally", returns the value of the first environment variable in *env* that exists, or returns
*default* if no environment variable is used.
When called inside ``with example_mode()``, returns *example* if non-None, otherwise returns *default* (but without
environment variable behaviour). This allows configuration to define required fields without default values that can
still generate a useful example (see :class:`TomlExampleGenerator`) without otherwise supplying data.
"""
def __init__(self, default: _DefaultArg = None, example: _DefaultArg = None, env: List[str] = None):
self._default: _DefaultCall = default if callable(default) else lambda: default
self._example: _DefaultCall = example if callable(example) else lambda: example
self._env: List[str] = env or []
def __call__(self) -> Union[str, _DefaultValue[_T]]:
global _example_mode
if _example_mode:
return self._get_example()
else:
return self._get_default()
def _get_default(self, use_env: bool = True) -> Union[str, _DefaultValue[_T]]:
if use_env:
for var in self._env:
if var in os.environ:
return os.environ[var]
return self._default()
def _get_example(self) -> Union[str, _DefaultValue[_T]]:
example = self._example()
if example is None:
example = self._get_default(use_env=False)
return example
class _OptionKind(Enum):
SIMPLE = "simple"
STRUCTURE = "structure"
SIMPLE_LIST = "simple_list"
SIMPLE_MAP = "simple_map"
STRUCTURE_LIST = "structure_list"
STRUCTURE_MAP = "structure_map"
@property
def is_simple(self):
return self in {self.SIMPLE, self.SIMPLE_LIST, self.SIMPLE_MAP}
@attr.s(frozen=True)
class _OptionMetadata(Generic[_B]):
type: Type[_B] = attr.ib()
kind: _OptionKind = attr.ib(validator=attr.validators.in_(_OptionKind))
help: str = attr.ib(default="", validator=attr.validators.instance_of(str))
[docs]def option(cls: Type[_B], *,
required: bool = None,
default: _DefaultArg[_B] = None,
example: _DefaultArg[_B] = None,
env: Union[str, List[str]] = None,
help: str):
"""Create a configuration option that contains a value of type *cls*.
:param cls: Option type (see :func:`is_allowable_type`)
:param required: A non-None value is required? (default: False if default is None, otherwise True)
:param default: Default value if no value is supplied (default: None)
:param example: Default value when generating example configuration (default: None)
:param env: Environment variables to try if no value is supplied, before using default (default: [])
:param help: Description of option, included when generating example configuration
"""
if not is_allowable_type(cls):
raise TypeError(f"cls must be subclass of Config or one of {_TYPE_MAP.keys()}")
if required is None:
required = default is not None
if isinstance(env, str):
env = [env]
if is_config(cls):
field = partial(types.ModelType, cls)
else:
field = _TYPE_MAP[cls]
field_kwargs = {
"required": required,
"default": _Default(default, example, env),
"metadata": {
_METADATA_KEY: _OptionMetadata(
type=cls,
kind=_OptionKind.STRUCTURE if is_config(cls) else _OptionKind.SIMPLE,
help=help,
),
},
}
return field(**field_kwargs)
[docs]def option_list(cls: Type[_B], *,
default: _DefaultArg[List[_B]] = None,
example: _DefaultArg[List[_B]] = None,
help: str):
"""Create a configuration option that contains a list of *cls* values.
:param cls: Option type (see :func:`is_allowable_type`)
:param default: Default value if no value is supplied (default: empty list)
:param example: Default value when generating example configuration (default: empty list)
:param help: Description of option, included when generating example configuration
"""
if not is_allowable_type(cls):
raise TypeError(f"cls must be subclass of Config or one of {_TYPE_MAP.keys()}")
if default is None:
default = list
if is_config(cls):
inner_field = types.ModelType(cls, required=True)
else:
inner_field = _TYPE_MAP[cls](required=True)
field_kwargs = {
"required": True, # Disallow None as a value, empty list is fine
"default": _Default(default, example),
"metadata": {
_METADATA_KEY: _OptionMetadata(
type=cls,
kind=_OptionKind.STRUCTURE_LIST if is_config(cls) else _OptionKind.SIMPLE_LIST,
help=help,
),
},
}
return types.ListType(inner_field, **field_kwargs)
[docs]def option_map(cls: Type[_B], *,
default: _DefaultArg[Dict[str, _B]] = None,
example: _DefaultArg[Dict[str, _B]] = None,
help: str):
"""Create a configuration option that contains a mapping of string keys to *cls* values.
:param cls: Option type (see :func:`is_allowable_type`)
:param default: Default value if no value is supplied (default: empty list)
:param example: Default value when generating example configuration (default: empty list)
:param help: Description of option, included when generating example configuration
"""
if not is_allowable_type(cls):
raise TypeError(f"cls must be subclass of Config or one of {_TYPE_MAP.keys()}")
if default is None:
default = dict
if is_config(cls):
inner_field = types.ModelType(cls, required=True)
else:
inner_field = _TYPE_MAP[cls](required=True)
field_kwargs = {
"required": True, # Disallow None as a value, empty dict is fine
"default": _Default(default, example),
"metadata": {
_METADATA_KEY: _OptionMetadata(
type=cls,
kind=_OptionKind.STRUCTURE_MAP if is_config(cls) else _OptionKind.SIMPLE_MAP,
help=help,
),
},
}
return types.DictType(inner_field, **field_kwargs)
[docs]def make_example(cls: Type[Config]) -> Config:
"""Create an instance of *cls* without supplying data, using "example" or "default" values for each option."""
with example_mode():
o = cls()
o.validate()
return o
[docs]class TomlExampleGenerator:
_BARE_KEY_REGEX = re.compile(r"^[A-Za-z0-9_-]+$")
_LIST_LINE_LENGTH_THRESHOLD = 120
def __init__(self, *, commented=False):
self._stream = None
self._commented = commented
self._encoder = toml.TomlEncoder()
self._at_start = True
@contextmanager
def _use_stream(self, new):
"""Make all :meth:`_write` and :meth:`_writeline` calls go to *new*."""
old = self._stream
self._stream = new
yield
self._stream = old
@contextmanager
def _set_commented(self, new=True):
"""Make sure all non-empty lines start with ``#``."""
old = self._commented
self._commented = new
yield
self._commented = old
def _write(self, s, raw=False):
"""Write *s* to the current stream; if *raw* is True, don't apply comment filter."""
if not raw and self._commented:
lines = s.split("\n")
modified = [f"# {line}" if line and not line.startswith("#") else line
for line in lines]
s = "\n".join(modified)
self._stream.write(s)
self._at_start = False
def _writeline(self, s, raw=False):
"""Write *s* to the current stream as a new line; if *raw* is True, don't apply comment filter."""
if not raw and self._commented and s and not s.startswith("#"):
s = f"# {s}"
s = f"{s}\n"
self._write(s, raw=True)
[docs] def generate(self, obj: Union[Config, Type[Config]], stream: TextIO, prefix: List[str] = None):
"""Generate an example from *obj* and write it to *stream*."""
if inspect.isclass(obj):
obj_ = make_example(obj)
else:
obj_ = cast(Config, obj)
assert is_config(obj)
if prefix is None:
prefix = []
with self._use_stream(stream):
self._generate_structure(obj_, prefix)
def _generate_option(self,
example: Any,
field: types.BaseType,
absolute_path: List[str],
relative_path: List[str]):
"""
Generate "## <help>" (if present)
Generate option example:
_generate_simple
_generate_simple_list
_generate_simple_map
_generate_structure
_generate_structure_list
_generate_structure_map
"""
metadata = self._get_metadata(field)
if metadata.help:
self._writeline(f"## {metadata.help}")
if metadata.kind is _OptionKind.SIMPLE:
self._generate_simple(example, relative_path)
elif metadata.kind is _OptionKind.SIMPLE_LIST:
self._generate_simple_list(example, relative_path)
elif metadata.kind is _OptionKind.SIMPLE_MAP:
self._generate_simple_map(example, relative_path)
elif metadata.kind is _OptionKind.STRUCTURE:
self._generate_structure(example, absolute_path)
elif metadata.kind is _OptionKind.STRUCTURE_LIST:
self._generate_structure_list(example, absolute_path)
elif metadata.kind is _OptionKind.STRUCTURE_MAP:
self._generate_structure_map(example, absolute_path)
def _generate_simple(self, example: Any, relative_path: List[str]):
"""
Generate <relative_path> = toml(<example>)
"""
key = self._make_key(relative_path)
if example is None:
self._writeline(f"# {key} =")
else:
self._writeline(f"{key} = {self._encoder.dump_value(example)}")
def _generate_simple_list(self, example: List[Any], relative_path: List[str]):
"""
Generate <relative_path> = toml(<example>)
"""
key = self._make_key(relative_path)
items = [str(self._encoder.dump_value(x)) for x in example]
output = f"{key} = [{', '.join(items)}]\n"
if len(items) > 0 and len(output) > self._LIST_LINE_LENGTH_THRESHOLD:
output = ",\n ".join(items) + ","
output = f"{key} = [\n {output}\n]\n"
self._write(output)
def _generate_simple_map(self, example: Dict[str, Any], relative_path: List[str]):
"""
Generate <relative_path>.<key> = toml(<example[key]>) for each key in <example>
"""
if len(example) == 0:
key = self._make_key(relative_path + ["_key_"])
self._writeline(f"# {key} = _value_")
else:
for k, v in example.items():
self._generate_simple(v, relative_path + [k])
def _generate_structure(self, example: Config, absolute_path: List[str], is_list_item: bool = False):
"""
Generate section heading:
Nothing if top-levell
[<absolute_path>] if option or map item
[[<absolute_path>]] if list item
Generate all "simple" options
Generate all "structure" options
"""
if absolute_path:
key = self._make_key(absolute_path)
if example is None:
self._write("# ")
if is_list_item:
self._writeline(f"[[{key}]]")
else:
self._writeline(f"[{key}]")
if example is None:
return
deferred = []
for atom in example.atoms():
metadata = self._get_metadata(atom.field)
if not metadata.kind.is_simple:
# Write sections after simple values
deferred.append(atom)
continue
self._generate_option(atom.value,
atom.field,
absolute_path + [atom.name],
[atom.name])
for atom in deferred:
self._write("\n")
self._generate_option(atom.value,
atom.field,
absolute_path + [atom.name],
[atom.name])
def _generate_structure_list(self, example: List[Any], absolute_path: List[str]):
"""
For each item in <example>:
Generate structure with [[<absolute_path>]] heading
"""
if len(example) == 0:
key = self._make_key(absolute_path)
self._writeline(f"# [[{key}]]")
else:
for item in example:
self._generate_structure(item, absolute_path, is_list_item=True)
def _generate_structure_map(self, example: Dict[str, Any], absolute_path: List[str]):
"""
For each item in <example>:
Generate structure from <example[key]> with [<absolute_path>.<key>] heading
"""
if len(example) == 0:
key = self._make_key(absolute_path)
self._writeline(f"# [{key}._key_]")
else:
for name, value in example.items():
self._generate_structure(value, absolute_path + [name])
@classmethod
def _get_metadata(cls, field: types.BaseType) -> _OptionMetadata:
return field.metadata[_METADATA_KEY]
@classmethod
def _make_key(cls, path):
return ".".join([_ if cls._BARE_KEY_REGEX.match(_) else _dump_str(_)
for _ in path])
[docs]def generate_toml_example(obj: Union[Config, Type[Config]], commented: bool = False) -> str:
"""Generate an example configuration from *obj* as a TOML string."""
stream = io.StringIO()
generator = TomlExampleGenerator(commented=commented)
generator.generate(obj, stream)
return stream.getvalue()