#!/usr/bin/env python3
from abc import ABC, abstractmethod
+import enum
import fnmatch
import logging
import re
from typing import Any, Callable, List, Optional, Set
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
logger = logging.getLogger(__name__)
-ACL_ORDER_ALLOW_DENY = 1
-ACL_ORDER_DENY_ALLOW = 2
+class Order(enum.Enum):
+ """A helper to express the order of evaluation for allows/denies
+ in an Access Control List.
+ """
+ UNDEFINED = 0
+ ALLOW_DENY = 1
+ DENY_ALLOW = 2
class SimpleACL(ABC):
def __init__(
self,
*,
- order_to_check_allow_deny: int,
+ order_to_check_allow_deny: Order,
default_answer: bool
):
if order_to_check_allow_deny not in (
- ACL_ORDER_ALLOW_DENY, ACL_ORDER_DENY_ALLOW
+ Order.ALLOW_DENY, Order.DENY_ALLOW
):
raise Exception(
- 'order_to_check_allow_deny must be ACL_ORDER_ALLOW_DENY or ' +
- 'ACL_ORDER_DENY_ALLOW')
+ 'order_to_check_allow_deny must be Order.ALLOW_DENY or ' +
+ 'Order.DENY_ALLOW')
self.order_to_check_allow_deny = order_to_check_allow_deny
self.default_answer = default_answer
def __call__(self, x: Any) -> bool:
"""Returns True if x is allowed, False otherwise."""
logger.debug(f'SimpleACL checking {x}')
- if self.order_to_check_allow_deny == ACL_ORDER_ALLOW_DENY:
+ if self.order_to_check_allow_deny == Order.ALLOW_DENY:
logger.debug('Checking allowed first...')
if self.check_allowed(x):
logger.debug(f'{x} was allowed explicitly.')
if self.check_denied(x):
logger.debug(f'{x} was denied explicitly.')
return False
- elif self.order_to_check_allow_deny == ACL_ORDER_DENY_ALLOW:
+ elif self.order_to_check_allow_deny == Order.DENY_ALLOW:
logger.debug('Checking denied first...')
if self.check_denied(x):
logger.debug(f'{x} was denied explicitly.')
*,
allow_set: Optional[Set[Any]] = None,
deny_set: Optional[Set[Any]] = None,
- order_to_check_allow_deny: int,
+ order_to_check_allow_deny: Order,
default_answer: bool) -> None:
super().__init__(
order_to_check_allow_deny=order_to_check_allow_deny,
return x in self.deny_set
+class AllowListACL(SetBasedACL):
+ """Convenience subclass for a list that only allows known items.
+ i.e. a 'whitelist'
+ """
+ def __init__(self,
+ *,
+ allow_set: Optional[Set[Any]]) -> None:
+ super().__init__(
+ allow_set = allow_set,
+ order_to_check_allow_deny = Order.ALLOW_DENY,
+ default_answer = False)
+
+
+class DenyListACL(SetBasedACL):
+ """Convenience subclass for a list that only disallows known items.
+ i.e. a 'blacklist'
+ """
+ def __init__(self,
+ *,
+ deny_set: Optional[Set[Any]]) -> None:
+ super().__init__(
+ deny_set = deny_set,
+ order_to_check_allow_deny = Order.ALLOW_DENY,
+ default_answer = True)
+
+
class PredicateListBasedACL(SimpleACL):
"""An ACL that allows or denies by applying predicates."""
def __init__(self,
*,
allow_predicate_list: List[Callable[[Any], bool]] = None,
deny_predicate_list: List[Callable[[Any], bool]] = None,
- order_to_check_allow_deny: int,
+ order_to_check_allow_deny: Order,
default_answer: bool) -> None:
super().__init__(
order_to_check_allow_deny=order_to_check_allow_deny,
*,
allowed_patterns: Optional[List[str]] = None,
denied_patterns: Optional[List[str]] = None,
- order_to_check_allow_deny: int,
+ order_to_check_allow_deny: Order,
default_answer: bool) -> None:
allow_predicates = []
if allowed_patterns is not None:
*,
allowed_regexs: Optional[List[re.Pattern]] = None,
denied_regexs: Optional[List[re.Pattern]] = None,
- order_to_check_allow_deny: int,
+ order_to_check_allow_deny: Order,
default_answer: bool) -> None:
allow_predicates = None
if allowed_regexs is not None:
)
-class AnyCompoundACL(object):
+class AnyCompoundACL(SimpleACL):
"""An ACL that allows if any of its subacls allow."""
- def __init__(self, subacls: List[SimpleACL]):
- assert subacls is not None
+ def __init__(self,
+ *,
+ subacls: Optional[List[SimpleACL]] = None,
+ order_to_check_allow_deny: Order,
+ default_answer: bool) -> None:
+ super().__init__(
+ order_to_check_allow_deny = order_to_check_allow_deny,
+ default_answer = default_answer
+ )
self.subacls = subacls
- def __call__(self, x: Any):
+ def check_allowed(self, x: Any) -> bool:
+ if self.subacls is None:
+ return False
return any(acl(x) for acl in self.subacls)
+ def check_denied(self, x: Any) -> bool:
+ if self.subacls is None:
+ return False
+ return any(not acl(x) for acl in self.subacls)
+
-class AllCompoundACL(object):
+class AllCompoundACL(SimpleACL):
"""An ACL that allows if all of its subacls allow."""
- def __init__(self, subacls: List[SimpleACL]):
- assert subacls is not None
+ def __init__(self,
+ *,
+ subacls: Optional[List[SimpleACL]] = None,
+ order_to_check_allow_deny: Order,
+ default_answer: bool) -> None:
+ super().__init__(
+ order_to_check_allow_deny = order_to_check_allow_deny,
+ default_answer = default_answer
+ )
self.subacls = subacls
- def __call__(self, x: Any):
+ def check_allowed(self, x: Any) -> bool:
+ if self.subacls is None:
+ return False
return all(acl(x) for acl in self.subacls)
+
+ def check_denied(self, x: Any) -> bool:
+ if self.subacls is None:
+ return False
+ return any(not acl(x) for acl in self.subacls)
import logging
import os
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
logger = logging.getLogger(__name__)
import time
import traceback
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
import argparse_utils
import config
import sys
from typing import Any, Dict, List
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
# Note: at this point in time, logging hasn't been configured and
# anything we log will come out the root logger.
"""Universal constants."""
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
# Date/time based constants
SECONDS_PER_MINUTE = 60
SECONDS_PER_HOUR = 60 * SECONDS_PER_MINUTE
import acl
import bootstrap
-from decorator_utils import decorate_matching_methods_with
+from datetime_utils import (
+ TimeUnit, n_timeunits_from_base, datetime_to_date, date_to_datetime
+)
from dateparse.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
from dateparse.dateparse_utilsListener import dateparse_utilsListener # type: ignore
from dateparse.dateparse_utilsParser import dateparse_utilsParser # type: ignore
+from decorator_utils import decorate_matching_methods_with
logger = logging.getLogger(__name__)
'exit*',
],
denied_patterns=None,
- order_to_check_allow_deny=acl.ACL_ORDER_DENY_ALLOW,
+ order_to_check_allow_deny=acl.Order.DENY_ALLOW,
default_answer=False
)
)
idea of "now" so that the code can be more easily unittested.
Leave as None for real use cases.
"""
- from datetime_utils import TimeUnit
self.month_name_to_number = {
'jan': 1,
'feb': 2,
def _reset(self):
"""Reset at init and between parses."""
- from datetime_utils import datetime_to_date
if self.override_now_for_test_purposes is None:
self.now_datetime = datetime.datetime.now()
self.today = datetime.date.today()
name = name.replace('washi', 'presi')
return name
- def _figure_out_date_unit(self, orig: str) -> int:
+ def _figure_out_date_unit(self, orig: str) -> TimeUnit:
"""Figure out what unit a date expression piece is talking about."""
- from datetime_utils import TimeUnit
if 'month' in orig:
return TimeUnit.MONTHS
txt = orig.lower()[:3]
def exitDateExpr(self, ctx: dateparse_utilsParser.DateExprContext) -> None:
"""When we leave the date expression, populate self.date."""
- from datetime_utils import (
- n_timeunits_from_base, datetime_to_date, date_to_datetime
- )
if 'special' in self.context:
self.date = self._parse_special_date(self.context['special'])
else:
def exitTimeExpr(self, ctx: dateparse_utilsParser.TimeExprContext) -> None:
# Simple time?
- from datetime_utils import TimeUnit
self.time = datetime.time(
self.context['hour'],
self.context['minute'],
def exitDeltaTimeFraction(
self, ctx: dateparse_utilsParser.DeltaTimeFractionContext
) -> None:
- from datetime_utils import TimeUnit
try:
txt = ctx.getText().lower()[:4]
if txt == 'quar':
def exitNFoosFromTodayAgoExpr(
self, ctx: dateparse_utilsParser.NFoosFromTodayAgoExprContext
) -> None:
- from datetime_utils import n_timeunits_from_base
d = self.now_datetime
try:
count = self._get_int(ctx.unsignedInt().getText())
def exitDeltaRelativeToTodayExpr(
self, ctx: dateparse_utilsParser.DeltaRelativeToTodayExprContext
) -> None:
- from datetime_utils import n_timeunits_from_base
d = self.now_datetime
try:
mod = ctx.thisNextLast()
pass
def main() -> None:
parser = DateParser()
for line in sys.stdin:
if __name__ == "__main__":
- main = bootstrap.initialize(main)
main()
import enum
import logging
import re
-from typing import NewType, Tuple
+from typing import Any, NewType, Tuple
import holidays # type: ignore
import pytz
MONTHS = 13
YEARS = 14
+ @classmethod
+ def is_valid(cls, value: Any):
+ if type(value) is int:
+ print("int")
+ return value in cls._value2member_map_
+ elif type(value) is TimeUnit:
+ print("TimeUnit")
+ return value.value in cls._value2member_map_
+ elif type(value) is str:
+ print("str")
+ return value in cls._member_names_
+ else:
+ print(type(value))
+ return False
+
def n_timeunits_from_base(
count: int,
unit: TimeUnit,
base: datetime.datetime
) -> datetime.datetime:
+ assert TimeUnit.is_valid(unit)
if count == 0:
return base
from typing import Callable, Optional
import warnings
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
import exceptions
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
T = TypeVar('T')
#!/usr/bin/env python3
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
class PreconditionException(AssertionError):
pass
import itertools
import logging
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
logger = logging.getLogger(__name__)
generators = {}
import pytz
import sys
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
import argparse_utils
import config
import time
from typing import Callable, List, TypeVar
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
from deferred_operand import DeferredOperand
import id_generator
even = acl.SetBasedACL(
allow_set = set([2, 4, 6, 8, 10]),
deny_set = set([1, 3, 5, 7, 9]),
- order_to_check_allow_deny = acl.ACL_ORDER_ALLOW_DENY,
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
default_answer = False
)
self.assertTrue(even(2))
def test_wildcard_based_acl(self):
a_or_b = acl.StringWildcardBasedACL(
allowed_patterns = ['a*', 'b*'],
- order_to_check_allow_deny = acl.ACL_ORDER_ALLOW_DENY,
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
default_answer = False
)
self.assertTrue(a_or_b('aardvark'))
- self.assertTrue(a_or_b('bubblegum'))
- self.assertFalse(a_or_b('charlie'))
+ self.assertTrue(a_or_b('baboon'))
+ self.assertFalse(a_or_b('cheetah'))
def test_re_based_acl(self):
weird = acl.StringREBasedACL(
re.compile('^a.*a$'),
re.compile('^b.*b$')
],
- order_to_check_allow_deny = acl.ACL_ORDER_ALLOW_DENY,
+ order_to_check_allow_deny = acl.Order.DENY_ALLOW,
default_answer = True
)
self.assertTrue(weird('aardvark'))
self.assertFalse(weird('anaconda'))
- self.assertFalse(weird('beelzebub'))
+ self.assertFalse(weird('blackneb'))
+ self.assertTrue(weird('crow'))
+
+ def test_compound_acls_disjunction(self):
+ a_b_c = acl.StringWildcardBasedACL(
+ allowed_patterns = ['a*', 'b*', 'c*'],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False
+ )
+ c_d_e = acl.StringWildcardBasedACL(
+ allowed_patterns = ['c*', 'd*', 'e*'],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False
+ )
+ disjunction = acl.AnyCompoundACL(
+ subacls = [a_b_c, c_d_e],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False,
+ )
+ self.assertTrue(disjunction('aardvark'))
+ self.assertTrue(disjunction('caribou'))
+ self.assertTrue(disjunction('eagle'))
+ self.assertFalse(disjunction('newt'))
+
+ def test_compound_acls_conjunction(self):
+ a_b_c = acl.StringWildcardBasedACL(
+ allowed_patterns = ['a*', 'b*', 'c*'],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False
+ )
+ c_d_e = acl.StringWildcardBasedACL(
+ allowed_patterns = ['c*', 'd*', 'e*'],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False
+ )
+ conjunction = acl.AllCompoundACL(
+ subacls = [a_b_c, c_d_e],
+ order_to_check_allow_deny = acl.Order.ALLOW_DENY,
+ default_answer = False,
+ )
+ self.assertFalse(conjunction('aardvark'))
+ self.assertTrue(conjunction('caribou'))
+ self.assertTrue(conjunction('condor'))
+ self.assertFalse(conjunction('eagle'))
+ self.assertFalse(conjunction('newt'))
if __name__ == '__main__':
import threading
from typing import Callable, Optional, Tuple
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
logger = logging.getLogger(__name__)