Source code for libestg3b.matcher

import datetime
from decimal import Decimal
from inspect import signature
from typing import Dict, Iterable, Iterator, Optional, Set  # noqa: W0611

import dateutil
from holidays import CountryHoliday


[docs]class Matcher(): """ Defines a bonus rule, e.g. "if someone works between 8pm and 6am, give them 25% more" """ def __init__(self, slug, description: str, impl, *, multiply: Optional[Decimal] = None, add: Optional[Decimal] = None, tests=[]) -> None: self._slug = slug self._description = description self._impl = impl self._multiply = multiply self._add = add self._tests = tests if multiply is not None and add is None: assert isinstance(multiply, Decimal) assert multiply > 0 elif add is not None and multiply is None: assert isinstance(add, Decimal) assert add > 0 else: assert False, "provide either multiply or add, but not both." assert len(slug) > 0 assert len(description) > 0 assert 1 <= len(self._impl_parameters) <= 3 @property def _impl_parameters(self): return signature(self._impl).parameters @property def _bonus(self): return self._multiply if self._multiply else self._add def _parse_test_time(self, tt): default = datetime.datetime(2018, 1, 10) return dateutil.parser.parse(tt, default=default) if tt else default
[docs] def examples(self): for t in self._tests: start, minute = t[0].split('~') minute = self._parse_test_time(minute) start = self._parse_test_time(start) if minute.time() < start.time(): minute = minute + dateutil.relativedelta.relativedelta(days=1) yield [ minute, start, t[1], ]
[docs] def match(self, minute: datetime.datetime, start: datetime.datetime, holidays: CountryHoliday) -> bool: narg = len(self._impl_parameters) if narg == 1: r = self._impl(minute) elif narg == 2: r = self._impl(minute, start) elif narg == 3: r = self._impl(minute, start, holidays) assert isinstance(r, bool) return r
def __repr__(self): return f'<Matcher: {self._slug} {self._description}>' def __hash__(self): return hash(self._slug) def __eq__(self, other): return self._slug == other._slug def __gt__(self, other): return self._bonus > other._bonus def __lt__(self, other): return self._bonus < other._bonus
[docs]class DayMatcher(Matcher): """ match, if the given minute is within the given day. Keyword arguments are passed onto Matcher. """ def __init__(self, slug, month: int, day: int, **kwargs) -> None: super().__init__( slug, f'{month:02d}-{day:02d}', lambda m: m.month == month and m.day == day, **kwargs, )
[docs]class DayTimeMatcher(Matcher): """ match, if the given minute is within the given day after the given hour. Keyword arguments are passed onto Matcher. """ def __init__(self, slug, month: int, day: int, hour: int, **kwargs) -> None: super().__init__( slug, f'{month:02d}-{day:02d} {hour:02d}:00+', lambda m: m.month == month and m.day == day and m.hour >= hour, **kwargs, )
[docs]class MatcherGroup(): """ A collection of similar :class:`Matcher` instances. When the group is evaluated, only the highest matching machter is returned. :param description: a short, human-readable text, explaining why the given matchers are grouped together. :param matchers: the initial set of matchers. """ def __init__(self, description: str, matchers: Iterable[Matcher]) -> None: self._description = description self._matchers = {} # type: Dict[str, Matcher] for m in matchers: self.append(m)
[docs] def append(self, matcher: Matcher) -> None: """ :param matcher: matcher to add; it must not yet exist in the group. """ if matcher._slug in self._matchers: raise Exception(f'Slug {matcher._slug} is already in this group') if not isinstance(matcher, Matcher): raise Exception('Matchers must be derived from libestg3b.Matcher') self._matchers[matcher._slug] = matcher
[docs] def match(self, minute: datetime.datetime, start: datetime.datetime, holidays: CountryHoliday) -> Optional[Matcher]: """ Evaluate this group. The given shift is tested using each of the stored matchers. The matcher with the highest bonus is the returned. If not a single one matches, ``None`` is returned. This method is to be used by :class:`libestg3b.EstG3b` only, but you can use it to implement more complex scenarios yourself. :param minute: minute to evaluate (see :class:`libestgb3.EstG3b`) :param start: the first minute in this shift (see :class:`libestgb3.EstG3b`) """ try: return max(filter(lambda matcher: matcher.match(minute, start, holidays), self)) except ValueError: # no match found return None
[docs] def extend(self, matchers: Iterable[Matcher], replace: bool = False) -> None: """ Add the given matchers to this group. :param matchers: :param replace: if one of the given matcher duplicates an existing one, overwrite it instead of raising an exception. """ for m in matchers: if replace: self._matchers.pop(m._slug, None) self.append(m)
def __contains__(self, item) -> bool: if isinstance(item, Matcher): return item._slug in self._matchers else: return item in self._matchers def __iter__(self) -> Iterator[Matcher]: return self._matchers.values().__iter__()
[docs]class Match(): def __init__(self, start: datetime.datetime, end: datetime.datetime, matchers: Set[Matcher]) -> None: self.start = start self.end = end self.matchers = matchers def __eq__(self, other): if not isinstance(other, Match): return False if all(getattr(self, attr) == getattr(other, attr) for attr in ('start', 'end', 'matchers')): return True return False