any stdlib names.
organize it into logical packages based on the code's functionality.
Note that when words would collide with a Python standard library or
reserved keyword I've used a 'z' at the end, e.g. 'collectionz'
-instead of 'collections', 'typez' instead of 'type', etc...
+instead of 'collections'.
There's some example code that uses various features of this project checked
in under `examples/ <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=tree;f=examples;h=d9744bf2b171ba7a9ff21ae1d3862b673647fff4;hb=HEAD>`_ that you can check out. See the `README <http://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/README;hb=HEAD>`__ in that directory for more information
pyutils.collectionz
pyutils.compress
- pyutils.datetimez
+ pyutils.datetimes
pyutils.files
pyutils.parallelize
pyutils.search
pyutils.security
- pyutils.typez
+ pyutils.types
Submodules
----------
from typing import Optional
from pyutils import bootstrap, config, exec_utils, stopwatch
-from pyutils.datetimez import datetime_utils
+from pyutils.datetimes import datetime_utils
from pyutils.files import file_utils, lockfile
logger = logging.getLogger(__name__)
cfg = config.add_commandline_args(
- f'Python Cron Runner ({__file__})',
- 'Wrapper for cron commands with locking, timeouts, and accounting.',
+ f"Python Cron Runner ({__file__})",
+ "Wrapper for cron commands with locking, timeouts, and accounting.",
)
cfg.add_argument(
- '--lockfile',
+ "--lockfile",
default=None,
- metavar='LOCKFILE_PATH',
- help='Path to the lockfile to use to ensure that two instances of a command do not execute contemporaneously.',
+ metavar="LOCKFILE_PATH",
+ help="Path to the lockfile to use to ensure that two instances of a command do not execute contemporaneously.",
)
cfg.add_argument(
- '--lockfile_audit_record',
+ "--lockfile_audit_record",
default=None,
- metavar='LOCKFILE_AUDIT_RECORD_FILENAME',
- help='Path to a record of when the logfile was held/released and for what reason',
+ metavar="LOCKFILE_AUDIT_RECORD_FILENAME",
+ help="Path to a record of when the logfile was held/released and for what reason",
)
cfg.add_argument(
- '--timeout',
+ "--timeout",
type=str,
- metavar='TIMEOUT',
+ metavar="TIMEOUT",
default=None,
help='Maximum time for lock acquisition + command execution. Undecorated for seconds but "3m" or "1h 15m" work too.',
)
cfg.add_argument(
- '--timestamp',
+ "--timestamp",
type=str,
- metavar='TIMESTAMP_FILE',
+ metavar="TIMESTAMP_FILE",
default=None,
- help='The /timestamp/TIMESTAMP_FILE file tracking the work being done; files\' mtimes will be set to the last successful run of a command for accounting purposes.',
+ help="The /timestamp/TIMESTAMP_FILE file tracking the work being done; files' mtimes will be set to the last successful run of a command for accounting purposes.",
)
cfg.add_argument(
- '--max_frequency',
+ "--max_frequency",
type=str,
- metavar='FREQUENCY',
+ metavar="FREQUENCY",
default=None,
help='The maximum frequency with which to do this work; even if the wrapper is invoked more often than this it will not run the command. Requires --timestamp. Undecorated for seconds but "3h" or "1h 15m" work too.',
)
cfg.add_argument(
- '--command',
- nargs='*',
+ "--command",
+ nargs="*",
required=True,
type=str,
- metavar='COMMANDLINE',
- help='The commandline to run under a lock.',
+ metavar="COMMANDLINE",
+ help="The commandline to run under a lock.",
)
config.overwrite_argparse_epilog(
"""
def run_command(timeout: Optional[int], timestamp_file: Optional[str]) -> int:
"""Run cron command"""
- cmd = ' '.join(config.config['command'])
+ cmd = " ".join(config.config["command"])
logger.info('cron cmd = "%s"', cmd)
- logger.debug('shell environment:')
+ logger.debug("shell environment:")
for var in os.environ:
val = os.environ[var]
- logger.debug('%s = %s', var, val)
- logger.debug('____ (↓↓↓ output from the subprocess appears below here ↓↓↓) ____')
+ logger.debug("%s = %s", var, val)
+ logger.debug("____ (↓↓↓ output from the subprocess appears below here ↓↓↓) ____")
try:
with stopwatch.Timer() as t:
ret = exec_utils.cmd_exitcode(cmd, timeout)
logger.debug(
- f'____ (↑↑↑ subprocess finished in {t():.2f}s, exit value was {ret} ↑↑↑) ____'
+ f"____ (↑↑↑ subprocess finished in {t():.2f}s, exit value was {ret} ↑↑↑) ____"
)
if timestamp_file is not None and os.path.exists(timestamp_file):
- logger.debug('Touching %s', timestamp_file)
+ logger.debug("Touching %s", timestamp_file)
file_utils.touch_file(timestamp_file)
return ret
except Exception as e:
logger.exception(e)
- print('Cron subprocess failed, giving up.', file=sys.stderr)
- logger.warning('Cron subprocess failed, giving up')
+ print("Cron subprocess failed, giving up.", file=sys.stderr)
+ logger.warning("Cron subprocess failed, giving up")
return -1000
@bootstrap.initialize
def main() -> int:
"""Entry point"""
- if config.config['timestamp']:
+ if config.config["timestamp"]:
timestamp_file = f"/timestamps/{config.config['timestamp']}"
if not file_utils.does_file_exist(timestamp_file):
logger.error(
- '--timestamp argument\'s target file (%s) must already exist.',
+ "--timestamp argument's target file (%s) must already exist.",
timestamp_file,
)
sys.exit(-1)
else:
timestamp_file = None
- if config.config['max_frequency']:
+ if config.config["max_frequency"]:
config.error(
- 'The --max_frequency argument requires the --timestamp argument.'
+ "The --max_frequency argument requires the --timestamp argument."
)
now = datetime.datetime.now()
if timestamp_file is not None and os.path.exists(timestamp_file):
- max_frequency = config.config['max_frequency']
+ max_frequency = config.config["max_frequency"]
if max_frequency is not None:
max_delta = datetime_utils.parse_duration(max_frequency)
if max_delta > 0:
)
sys.exit(0)
- timeout = config.config['timeout']
+ timeout = config.config["timeout"]
if timeout is not None:
timeout = datetime_utils.parse_duration(timeout)
assert timeout > 0
- logger.debug('Timeout is %ss', timeout)
+ logger.debug("Timeout is %ss", timeout)
lockfile_expiration = datetime.datetime.now().timestamp() + timeout
else:
- logger.debug('Timeout not specified; no lockfile expiration.')
+ logger.debug("Timeout not specified; no lockfile expiration.")
lockfile_expiration = None
- lockfile_path = config.config['lockfile']
+ lockfile_path = config.config["lockfile"]
if lockfile_path is not None:
- logger.debug('Attempting to acquire lockfile %s...', lockfile_path)
+ logger.debug("Attempting to acquire lockfile %s...", lockfile_path)
try:
with lockfile.LockFile(
lockfile_path,
do_signal_cleanup=True,
- override_command=' '.join(config.config['command']),
+ override_command=" ".join(config.config["command"]),
expiration_timestamp=lockfile_expiration,
) as lf:
- record = config.config['lockfile_audit_record']
- cmd = ' '.join(config.config['command'])
+ record = config.config["lockfile_audit_record"]
+ cmd = " ".join(config.config["command"])
if record:
start = lf.locktime
- with open(record, 'a') as wf:
- print(f'{lockfile_path}, ACQUIRE, {start}, {cmd}', file=wf)
+ with open(record, "a") as wf:
+ print(f"{lockfile_path}, ACQUIRE, {start}, {cmd}", file=wf)
retval = run_command(timeout, timestamp_file)
if record:
end = datetime.datetime.now().timestamp()
duration = datetime_utils.describe_duration_briefly(end - start)
- with open(record, 'a') as wf:
+ with open(record, "a") as wf:
print(
- f'{lockfile_path}, RELEASE({duration}), {end}, {cmd}',
+ f"{lockfile_path}, RELEASE({duration}), {end}, {cmd}",
file=wf,
)
return retval
except lockfile.LockFileException as e:
logger.exception(e)
- msg = f'Failed to acquire {lockfile_path}, giving up.'
+ msg = f"Failed to acquire {lockfile_path}, giving up."
logger.error(msg)
print(msg, file=sys.stderr)
return 1000
else:
- logger.debug('No lockfile indicated; not locking anything.')
+ logger.debug("No lockfile indicated; not locking anything.")
return run_command(timeout, timestamp_file)
-if __name__ == '__main__':
+if __name__ == "__main__":
# Insist that our logger.whatever('messages') make their way into
# syslog with a facility=LOG_CRON, please. Yes, this is hacky.
- sys.argv.append('--logging_syslog')
- sys.argv.append('--logging_syslog_facility=CRON')
+ sys.argv.append("--logging_syslog")
+ sys.argv.append("--logging_syslog_facility=CRON")
main()
from pyutils import argparse_utils, bootstrap, config, persistent, string_utils
from pyutils.ansi import fg, reset
-from pyutils.datetimez import dateparse_utils as dateparse
+from pyutils.datetimes import dateparse_utils as dateparse
from pyutils.files import file_utils
logger = logging.getLogger(__name__)
cfg.add_argument(
"--reminder_filename",
type=argparse_utils.valid_filename,
- default='.reminder',
- metavar='FILENAME',
+ default=".reminder",
+ metavar="FILENAME",
help="Override the .reminder filepath",
)
cfg.add_argument(
- '--reminder_cache_file',
+ "--reminder_cache_file",
type=str,
default=f'{os.environ["HOME"]}/.reminder_cache',
- metavar='FILENAME',
- help='Override the .reminder cache location',
+ metavar="FILENAME",
+ help="Override the .reminder cache location",
)
cfg.add_argument(
- "-n", "--count", type=int, metavar='COUNT', help="How many events to remind about"
+ "-n", "--count", type=int, metavar="COUNT", help="How many events to remind about"
)
cfg.add_argument(
"--days_ahead",
type=int,
- metavar='#DAYS',
+ metavar="#DAYS",
help="How many days ahead to remind about",
)
cfg.add_argument(
def __init__(
self, cached_state: Optional[Dict[datetime.date, List[str]]] = None
) -> None:
- if not config.config['override_timestamp']:
+ if not config.config["override_timestamp"]:
self.today = datetime.date.today()
else:
- self.today = config.config['override_timestamp'][0].date()
+ self.today = config.config["override_timestamp"][0].date()
logger.debug(
'Overriding "now" with %s because of commandline argument.',
self.today,
self.label_by_date = cached_state
return
self.label_by_date: Dict[datetime.date, List[str]] = defaultdict(list)
- self.read_file(config.config['reminder_filename'])
+ self.read_file(config.config["reminder_filename"])
def handle_event_by_adjusting_year_to_now(
self,
month=orig_date.month,
day=orig_date.day,
)
- logger.debug('Date in %d: %s', year, dt)
+ logger.debug("Date in %d: %s", year, dt)
self.label_by_date[dt].append(label)
- logger.debug('%s => %s', dt, label)
+ logger.debug("%s => %s", dt, label)
def handle_event_with_fixed_year(
self,
orig_date: datetime.date,
orig_label: str,
) -> None:
- logger.debug('Fixed date event...')
+ logger.debug("Fixed date event...")
self.label_by_date[orig_date].append(orig_label)
- logger.debug('%s => %s', orig_date, orig_label)
+ logger.debug("%s => %s", orig_date, orig_label)
def read_file(self, filename: str) -> None:
- logger.debug('Reading %s:', filename)
+ logger.debug("Reading %s:", filename)
date_parser = dateparse.DateParser()
parsing_mode = Reminder.MODE_EVENT
with open(filename) as f:
line = re.sub(r"#.*$", "", line)
if re.match(r"^ *$", line) is not None:
continue
- logger.debug('> %s', line)
+ logger.debug("> %s", line)
try:
if "=" in line:
label, date = line.split("=")
else:
print(f"Skipping unparsable line: {line}", file=sys.stderr)
- logger.error('Skipping malformed line: %s', line)
+ logger.error("Skipping malformed line: %s", line)
continue
if label == "type":
if "event" in date:
parsing_mode = Reminder.MODE_EVENT
- logger.debug('--- EVENT MODE ---')
+ logger.debug("--- EVENT MODE ---")
elif "birthday" in date:
parsing_mode = Reminder.MODE_BIRTHDAY
- logger.debug('--- BIRTHDAY MODE ---')
+ logger.debug("--- BIRTHDAY MODE ---")
elif "anniversary" in date:
parsing_mode = Reminder.MODE_ANNIVERSARY
- logger.debug('--- ANNIVERSARY MODE ---')
+ logger.debug("--- ANNIVERSARY MODE ---")
else:
date_parser.parse(date)
orig_date = date_parser.get_date()
if orig_date is None:
print(f"Skipping unparsable date: {line}", file=sys.stderr)
- logger.error('Skipping line with unparsable date')
+ logger.error("Skipping line with unparsable date")
continue
- logger.debug('Original date: %s', orig_date)
+ logger.debug("Original date: %s", orig_date)
overt_year = date_parser.saw_overt_year
if parsing_mode in (
except Exception as e:
print(f"Skipping unparsable line: {line}", file=sys.stderr)
- logger.error('Skipping malformed line: %s', line)
+ logger.error("Skipping malformed line: %s", line)
logger.exception(e)
def remind(
@classmethod
def load(cls):
- if not config.config['override_timestamp']:
+ if not config.config["override_timestamp"]:
now = datetime.datetime.now()
else:
- now = config.config['override_timestamp'][0]
+ now = config.config["override_timestamp"][0]
logger.debug(
'Overriding "now" with %s because of commandline argument.', now
)
cache_ts = file_utils.get_file_mtime_as_datetime(
- config.config['reminder_cache_file']
+ config.config["reminder_cache_file"]
)
if cache_ts is None:
return None
and now.year == cache_ts.year
):
reminder_ts = file_utils.get_file_mtime_as_datetime(
- config.config['reminder_filename']
+ config.config["reminder_filename"]
)
# ...and the .reminder file wasn't updated since the cache write...
if reminder_ts <= cache_ts:
import pickle
- with open(config.config['reminder_cache_file'], 'rb') as rf:
+ with open(config.config["reminder_cache_file"], "rb") as rf:
reminder_data = pickle.load(rf)
return cls(reminder_data)
return None
def save(self):
import pickle
- with open(config.config['reminder_cache_file'], 'wb') as wf:
+ with open(config.config["reminder_cache_file"], "wb") as wf:
pickle.dump(
self.label_by_date,
wf,
@bootstrap.initialize
def main() -> None:
reminder = Reminder()
- count = config.config['count']
- days_ahead = config.config['days_ahead']
- reminder.remind(count, days_ahead, config.config['date'])
+ count = config.config["count"]
+ days_ahead = config.config["days_ahead"]
+ reminder.remind(count, days_ahead, config.config["date"])
return None
from pyutils.parallelize import executors
from pyutils.parallelize import parallelize as par
from pyutils.parallelize import smart_future
-from pyutils.typez import histogram
+from pyutils.types import histogram
logger = logging.getLogger(__name__)
args = config.add_commandline_args(
def __init__(self, option_strings, dest, default=None, required=False, help=None):
if default is None:
- msg = 'You must provide a default with Yes/No action'
+ msg = "You must provide a default with Yes/No action"
logger.critical(msg)
raise ValueError(msg)
if len(option_strings) != 1:
- msg = 'Only single argument is allowed with NoYes action'
+ msg = "Only single argument is allowed with NoYes action"
logger.critical(msg)
raise ValueError(msg)
opt = option_strings[0]
- if not opt.startswith('--'):
- msg = 'Yes/No arguments must be prefixed with --'
+ if not opt.startswith("--"):
+ msg = "Yes/No arguments must be prefixed with --"
logger.critical(msg)
raise ValueError(msg)
opt = opt[2:]
- opts = ['--' + opt, '--no_' + opt]
+ opts = ["--" + opt, "--no_" + opt]
super().__init__(
opts,
dest,
@overrides
def __call__(self, parser, namespace, values, option_strings=None):
- if option_strings.startswith('--no-') or option_strings.startswith('--no_'):
+ if option_strings.startswith("--no-") or option_strings.startswith("--no_"):
setattr(namespace, self.dest, False)
else:
setattr(namespace, self.dest, True)
argparse.ArgumentTypeError: 115 is an invalid percentage; expected 0 <= n <= 100.0
"""
- num = num.strip('%')
+ num = num.strip("%")
n = float(num)
if 0.0 <= n <= 100.0:
return n
date = to_date(txt)
if date is not None:
return date
- msg = f'Cannot parse argument as a date: {txt}'
+ msg = f"Cannot parse argument as a date: {txt}"
logger.error(msg)
raise argparse.ArgumentTypeError(msg)
.. note::
Because this code uses an English date-expression parsing grammar
internally, much more complex datetimes can be expressed in free form.
- See :mod:`pyutils.datetimez.dateparse_utils` for details. These
+ See :mod:`pyutils.datetimes.dateparse_utils` for details. These
are not included in here because they are hard to write valid doctests
for!
dt = to_datetime(txt)
if dt is not None:
return dt
- msg = f'Cannot parse argument as datetime: {txt}'
+ msg = f"Cannot parse argument as datetime: {txt}"
logger.error(msg)
raise argparse.ArgumentTypeError(msg)
...
argparse.ArgumentTypeError: a little while is not a valid duration.
"""
- from pyutils.datetimez.datetime_utils import parse_duration
+ from pyutils.datetimes.datetime_utils import parse_duration
try:
secs = parse_duration(txt, raise_on_error=True)
raise argparse.ArgumentTypeError(e) from e
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
- doctest.ELLIPSIS_MARKER = '-ANYTHING-'
+ doctest.ELLIPSIS_MARKER = "-ANYTHING-"
doctest.testmod()
:meth:`pyutils.string_utils.to_date`. This means any of these are
also able to accept and recognize this larger set of date expressions.
-See the `unittest <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=tests/datetimez/dateparse_utils_test.py;h=93c7b96e4c19af217fbafcf1ed5dbde13ec599c5;hb=HEAD>`_ for more examples and the `grammar <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/datetimez/dateparse_utils.g4;hb=HEAD>`_ for more details.
+See the `unittest <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=tests/datetimes/dateparse_utils_test.py;h=93c7b96e4c19af217fbafcf1ed5dbde13ec599c5;hb=HEAD>`_ for more examples and the `grammar <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/datetimes/dateparse_utils.g4;hb=HEAD>`_ for more details.
"""
import datetime
import pytz
from pyutils import bootstrap, decorator_utils
-from pyutils.datetimez.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
-from pyutils.datetimez.dateparse_utilsListener import (
+from pyutils.datetimes.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
+from pyutils.datetimes.dateparse_utilsListener import (
dateparse_utilsListener,
) # type: ignore
-from pyutils.datetimez.dateparse_utilsParser import (
+from pyutils.datetimes.dateparse_utilsParser import (
dateparse_utilsParser,
) # type: ignore
-from pyutils.datetimez.datetime_utils import (
+from pyutils.datetimes.datetime_utils import (
TimeUnit,
date_to_datetime,
datetime_to_date,
import holidays # type: ignore
import pytz
-from pyutils.datetimez import constants
+from pyutils.datetimes import constants
logger = logging.getLogger(__name__)
if dt.tzinfo == tz:
return dt
raise Exception(
- f'{dt} is already timezone aware; use replace_timezone or translate_timezone '
- + 'depending on the semantics you want. See the pydocs / code.'
+ f"{dt} is already timezone aware; use replace_timezone or translate_timezone "
+ + "depending on the semantics you want. See the pydocs / code."
)
return dt.replace(tzinfo=tz)
"""
if is_timezone_aware(dt):
logger.warning(
- '%s already has a timezone; klobbering it anyway.\n Be aware that this operation changed the instant to which the object refers.',
+ "%s already has a timezone; klobbering it anyway.\n Be aware that this operation changed the instant to which the object refers.",
dt,
)
return datetime.datetime(
) -> Tuple[datetime.datetime, str]:
"""A nice way to convert a string into a datetime. Returns both the
datetime and the format string used to parse it. Also consider
- :mod:`pyutils.datetimez.dateparse_utils` for a full parser alternative.
+ :mod:`pyutils.datetimes.dateparse_utils` for a full parser alternative.
Args:
txt: the string to be converted into a datetime
1439
"""
if hour < 0 or hour > 23:
- raise ValueError(f'Bad hour: {hour}. Expected 0 <= hour <= 23')
+ raise ValueError(f"Bad hour: {hour}. Expected 0 <= hour <= 23")
if minute < 0 or minute > 59:
- raise ValueError(f'Bad minute: {minute}. Expected 0 <= minute <= 59')
+ raise ValueError(f"Bad minute: {minute}. Expected 0 <= minute <= 59")
return MinuteOfDay(hour * 60 + minute)
return int(duration)
m = re.match(
- r'(\d+ *d[ays]*)* *(\d+ *h[ours]*)* *(\d+ *m[inutes]*)* *(\d+ *[seconds]*)',
+ r"(\d+ *d[ays]*)* *(\d+ *h[ours]*)* *(\d+ *m[inutes]*)* *(\d+ *[seconds]*)",
duration,
)
if not m and raise_on_error:
- raise ValueError(f'{duration} is not a valid duration.')
+ raise ValueError(f"{duration} is not a valid duration.")
seconds = 0
- m = re.search(r'(\d+) *d[ays]*', duration)
+ m = re.search(r"(\d+) *d[ays]*", duration)
if m is not None:
seconds += int(m.group(1)) * 60 * 60 * 24
- m = re.search(r'(\d+) *h[ours]*', duration)
+ m = re.search(r"(\d+) *h[ours]*", duration)
if m is not None:
seconds += int(m.group(1)) * 60 * 60
- m = re.search(r'(\d+) *m[inutes]*', duration)
+ m = re.search(r"(\d+) *m[inutes]*", duration)
if m is not None:
seconds += int(m.group(1)) * 60
- m = re.search(r'(\d+) *s[econds]*', duration)
+ m = re.search(r"(\d+) *s[econds]*", duration)
if m is not None:
seconds += int(m.group(1))
return seconds
descr = descr + f"{int(minutes[0])} minutes"
if include_seconds:
- descr = descr + ', '
+ descr = descr + ", "
if len(descr) > 0:
- descr = descr + 'and '
+ descr = descr + "and "
s = minutes[1]
if s == 1:
- descr = descr + '1 second'
+ descr = descr + "1 second"
else:
- descr = descr + f'{s} seconds'
+ descr = descr + f"{s} seconds"
return descr
hours = divmod(days[1], constants.SECONDS_PER_HOUR)
minutes = divmod(hours[1], constants.SECONDS_PER_MINUTE)
- descr = ''
+ descr = ""
if days[0] > 0:
- descr = f'{int(days[0])}d '
+ descr = f"{int(days[0])}d "
if hours[0] > 0:
- descr = descr + f'{int(hours[0])}h '
+ descr = descr + f"{int(hours[0])}h "
if minutes[0] > 0 or (len(descr) == 0 and not include_seconds):
- descr = descr + f'{int(minutes[0])}m '
+ descr = descr + f"{int(minutes[0])}m "
if minutes[1] > 0 and include_seconds:
- descr = descr + f'{int(minutes[1])}s'
+ descr = descr + f"{int(minutes[1])}s"
return descr.strip()
return datetime.date(int(y), int(m), int(d))
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
def remove_newlines(x: str) -> str:
"""Trivial function to be used as a line_transformer in
:meth:`slurp_file` for no newlines in file contents"""
- return x.replace('\n', '')
+ return x.replace("\n", "")
def strip_whitespace(x: str) -> str:
def remove_hash_comments(x: str) -> str:
"""Trivial function to be used as a line_transformer in
:meth:`slurp_file` for no # comments in file contents"""
- return re.sub(r'#.*$', '', x)
+ return re.sub(r"#.*$", "", x)
def slurp_file(
for x in line_transformers:
xforms.append(x)
if not file_is_readable(filename):
- raise Exception(f'{filename} can\'t be read.')
+ raise Exception(f"{filename} can't be read.")
with open(filename) as rf:
for line in rf:
for transformation in xforms:
line = transformation(line)
- if skip_blank_lines and line == '':
+ if skip_blank_lines and line == "":
continue
ret.append(line)
return ret
>>> fix_multiple_slashes(p) == p
True
"""
- return re.sub(r'/+', '/', path)
+ return re.sub(r"/+", "/", path)
def delete(path: str) -> None:
'/home/scott/foobar'
"""
- while '.' in path:
+ while "." in path:
path = without_extension(path)
return path
def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
"""~Internal helper"""
- from pyutils.datetimez.datetime_utils import (
+ from pyutils.datetimes.datetime_utils import (
describe_duration,
describe_duration_briefly,
)
"""
self.filename = filename
uuid = uuid4()
- self.tempfile = f'{filename}-{uuid}.tmp'
+ self.tempfile = f"{filename}-{uuid}.tmp"
self.handle: Optional[TextIO] = None
def __enter__(self) -> TextIO:
def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
if self.handle is not None:
self.handle.close()
- cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
+ cmd = f"/bin/mv -f {self.tempfile} {self.filename}"
ret = os.system(cmd)
if (ret >> 8) != 0:
- raise Exception(f'{cmd} failed, exit value {ret>>8}!')
+ raise Exception(f"{cmd} failed, exit value {ret>>8}!")
return False
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
from typing import Literal, Optional
from pyutils import argparse_utils, config, decorator_utils
-from pyutils.datetimez import datetime_utils
+from pyutils.datetimes import datetime_utils
-cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
+cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
cfg.add_argument(
- '--lockfile_held_duration_warning_threshold',
+ "--lockfile_held_duration_warning_threshold",
type=argparse_utils.valid_duration,
default=datetime.timedelta(60.0),
- metavar='DURATION',
- help='If a lock is held for longer than this threshold we log a warning',
+ metavar="DURATION",
+ help="If a lock is held for longer than this threshold we log a warning",
)
logger = logging.getLogger(__name__)
logger.debug(contents)
f.write(contents)
self.locktime = datetime.datetime.now().timestamp()
- logger.debug('Success; I own %s.', self.lockfile)
+ logger.debug("Success; I own %s.", self.lockfile)
self.is_locked = True
return True
except OSError:
pass
- logger.warning('Couldn\'t acquire %s.', self.lockfile)
+ logger.warning("Couldn't acquire %s.", self.lockfile)
return False
def acquire_with_retries(
if (
duration
>= config.config[
- 'lockfile_held_duration_warning_threshold'
+ "lockfile_held_duration_warning_threshold"
].total_seconds()
):
# Note: describe duration briefly only does 1s granularity...
str_duration = datetime_utils.describe_duration_briefly(int(duration))
- msg = f'Held {self.lockfile} for {str_duration}'
+ msg = f"Held {self.lockfile} for {str_duration}"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
self.release()
if self.override_command:
cmd = self.override_command
else:
- cmd = ' '.join(sys.argv)
+ cmd = " ".join(sys.argv)
contents = LockFileContents(
pid=os.getpid(),
commandline=cmd,
def _detect_stale_lockfile(self) -> None:
try:
- with open(self.lockfile, 'r') as rf:
+ with open(self.lockfile, "r") as rf:
lines = rf.readlines()
if len(lines) == 1:
line = lines[0]
os.kill(contents.pid, 0)
except OSError:
logger.warning(
- 'Lockfile %s\'s pid (%d) is stale; force acquiring...',
+ "Lockfile %s's pid (%d) is stale; force acquiring...",
self.lockfile,
contents.pid,
)
now = datetime.datetime.now().timestamp()
if now > contents.expiration_timestamp:
logger.warning(
- 'Lockfile %s\'s expiration time has passed; force acquiring',
+ "Lockfile %s's expiration time has passed; force acquiring",
self.lockfile,
)
self.release()
import cloudpickle # type: ignore
from overrides import overrides
-import pyutils.typez.histogram as hist
+import pyutils.types.histogram as hist
from pyutils import (
argparse_utils,
config,
from pyutils.decorator_utils import singleton
from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
from pyutils.parallelize.thread_utils import background_thread
-from pyutils.typez import type_utils
+from pyutils.types import type_utils
logger = logging.getLogger(__name__)
Returns:
The datetime.date the string contained or None to indicate
an error. This parser is relatively clever; see
- :class:`datetimez.dateparse_utils` docs for details.
+ :class:`datetimes.dateparse_utils` docs for details.
- See also: :mod:`pyutils.datetimez.dateparse_utils`, :meth:`extract_date`,
+ See also: :mod:`pyutils.datetimes.dateparse_utils`, :meth:`extract_date`,
:meth:`is_valid_date`, :meth:`to_datetime`, :meth:`valid_datetime`.
>>> to_date('9/11/2001')
datetime.date(2001, 9, 11)
>>> to_date('xyzzy')
"""
- import pyutils.datetimez.dateparse_utils as du
+ import pyutils.datetimes.dateparse_utils as du
try:
d = du.DateParser() # type: ignore
Returns:
a datetime if date was found, otherwise None
- See also: :mod:`pyutils.datetimez.dateparse_utils`, :meth:`to_date`,
+ See also: :mod:`pyutils.datetimes.dateparse_utils`, :meth:`to_date`,
:meth:`is_valid_date`, :meth:`to_datetime`, :meth:`valid_datetime`.
>>> extract_date("filename.txt dec 13, 2022")
"""
import itertools
- import pyutils.datetimez.dateparse_utils as du
+ import pyutils.datetimes.dateparse_utils as du
d = du.DateParser() # type: ignore
chunks = in_str.split()
Returns:
True if the string represents a valid date that we can recognize
and False otherwise. This parser is relatively clever; see
- :class:`datetimez.dateparse_utils` docs for details.
+ :class:`datetimes.dateparse_utils` docs for details.
- See also: :mod:`pyutils.datetimez.dateparse_utils`, :meth:`to_date`,
+ See also: :mod:`pyutils.datetimes.dateparse_utils`, :meth:`to_date`,
:meth:`extract_date`, :meth:`to_datetime`, :meth:`valid_datetime`.
>>> is_valid_date('1/2/2022')
>>> is_valid_date('xyzzy')
False
"""
- import pyutils.datetimez.dateparse_utils as dp
+ import pyutils.datetimes.dateparse_utils as dp
try:
d = dp.DateParser() # type: ignore
Returns:
A python datetime parsed from in_str or None to indicate
an error. This parser is relatively clever; see
- :class:`datetimez.dateparse_utils` docs for details.
+ :class:`datetimes.dateparse_utils` docs for details.
- See also: :mod:`pyutils.datetimez.dateparse_utils`, :meth:`to_date`,
+ See also: :mod:`pyutils.datetimes.dateparse_utils`, :meth:`to_date`,
:meth:`extract_date`, :meth:`valid_datetime`.
>>> to_datetime('7/20/1969 02:56 GMT')
datetime.datetime(1969, 7, 20, 2, 56, tzinfo=<StaticTzInfo 'GMT'>)
"""
- import pyutils.datetimez.dateparse_utils as dp
+ import pyutils.datetimes.dateparse_utils as dp
try:
d = dp.DateParser() # type: ignore
Returns:
True if in_str contains a valid datetime and False otherwise.
This parser is relatively clever; see
- :class:`datetimez.dateparse_utils` docs for details.
+ :class:`datetimes.dateparse_utils` docs for details.
>>> valid_datetime('next wednesday at noon')
True
of this and decide whether it's suitable for your
application.
-See also the :class:`pyutils.typez.Money` class which uses Python
+See also the :class:`pyutils.types.Money` class which uses Python
Decimals (see: https://docs.python.org/3/library/decimal.html) to
represent monetary amounts.
"""
def __init__(
self,
- centcount: Union[int, float, str, 'CentCount'] = 0,
- currency: str = 'USD',
+ centcount: Union[int, float, str, "CentCount"] = 0,
+ currency: str = "USD",
*,
strict_mode=False,
):
def __repr__(self):
w = self.centcount // 100
p = self.centcount % 100
- s = f'{w}.{p:02d}'
+ s = f"{w}.{p:02d}"
if self.currency is not None:
- return f'{s} {self.currency}'
+ return f"{s} {self.currency}"
else:
- return f'${s}'
+ return f"${s}"
def __pos__(self):
return CentCount(centcount=self.centcount, currency=self.currency)
currency=self.currency,
)
else:
- raise TypeError('Incompatible currencies in add expression')
+ raise TypeError("Incompatible currencies in add expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return self.__add__(CentCount(other, self.currency))
currency=self.currency,
)
else:
- raise TypeError('Incompatible currencies in add expression')
+ raise TypeError("Incompatible currencies in add expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return self.__sub__(CentCount(other, self.currency))
application.
"""
if isinstance(other, CentCount):
- raise TypeError('can not multiply monetary quantities')
+ raise TypeError("can not multiply monetary quantities")
else:
return CentCount(
centcount=int(self.centcount * float(other)),
application.
"""
if isinstance(other, CentCount):
- raise TypeError('can not divide monetary quantities')
+ raise TypeError("can not divide monetary quantities")
else:
return CentCount(
centcount=int(float(self.centcount) / float(other)),
currency=self.currency,
)
else:
- raise TypeError('Incompatible currencies in sub expression')
+ raise TypeError("Incompatible currencies in sub expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return CentCount(
centcount=int(other) - self.centcount,
if self.currency == other.currency:
return self.centcount < other.centcount
else:
- raise TypeError('can not directly compare different currencies')
+ raise TypeError("can not directly compare different currencies")
else:
if self.strict_mode:
- raise TypeError('In strict mode, only two CentCounts can be compated')
+ raise TypeError("In strict mode, only two CentCounts can be compated")
else:
return self.centcount < int(other)
if self.currency == other.currency:
return self.centcount > other.centcount
else:
- raise TypeError('can not directly compare different currencies')
+ raise TypeError("can not directly compare different currencies")
else:
if self.strict_mode:
- raise TypeError('In strict mode, only two CentCounts can be compated')
+ raise TypeError("In strict mode, only two CentCounts can be compated")
else:
return self.centcount > int(other)
centcount = None
currency = None
s = s.strip()
- chunks = s.split(' ')
+ chunks = s.split(" ")
try:
for chunk in chunks:
if CentCount.CENTCOUNT_RE.match(chunk) is not None:
if centcount is not None and currency is not None:
return (centcount, currency)
elif centcount is not None:
- return (centcount, 'USD')
+ return (centcount, "USD")
return None
@classmethod
- def parse(cls, s: str) -> 'CentCount':
+ def parse(cls, s: str) -> "CentCount":
"""Parses a string format monetary amount and returns a CentCount
if possible.
raise Exception(f'Unable to parse money string "{s}"')
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
aggregation with non-:class:`Money` operands (i.e. no comparison or
aggregation with literal numbers).
-See also :class:`pyutils.typez.centcount.CentCount` which represents
+See also :class:`pyutils.types.centcount.CentCount` which represents
monetary amounts as an integral number of cents.
"""
def __init__(
self,
- amount: Union[Decimal, str, float, int, 'Money'] = Decimal("0"),
- currency: str = 'USD',
+ amount: Union[Decimal, str, float, int, "Money"] = Decimal("0"),
+ currency: str = "USD",
*,
strict_mode=False,
):
digits = list(map(str, digits))
build, next = result.append, digits.pop
for i in range(2):
- build(next() if digits else '0')
- build('.')
+ build(next() if digits else "0")
+ build(".")
if not digits:
- build('0')
+ build("0")
i = 0
while digits:
build(next())
if i == 3 and digits:
i = 0
if sign:
- build('-')
+ build("-")
if self.currency:
- return ''.join(reversed(result)) + ' ' + self.currency
+ return "".join(reversed(result)) + " " + self.currency
else:
- return '$' + ''.join(reversed(result))
+ return "$" + "".join(reversed(result))
def __pos__(self):
return Money(amount=self.amount, currency=self.currency)
if self.currency == other.currency:
return Money(amount=self.amount + other.amount, currency=self.currency)
else:
- raise TypeError('Incompatible currencies in add expression')
+ raise TypeError("Incompatible currencies in add expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return Money(
amount=self.amount + Decimal(float(other)),
if self.currency == other.currency:
return Money(amount=self.amount - other.amount, currency=self.currency)
else:
- raise TypeError('Incompatible currencies in add expression')
+ raise TypeError("Incompatible currencies in add expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return Money(
amount=self.amount - Decimal(float(other)),
def __mul__(self, other):
if isinstance(other, Money):
- raise TypeError('can not multiply monetary quantities')
+ raise TypeError("can not multiply monetary quantities")
else:
return Money(
amount=self.amount * Decimal(float(other)),
def __truediv__(self, other):
if isinstance(other, Money):
- raise TypeError('can not divide monetary quantities')
+ raise TypeError("can not divide monetary quantities")
else:
return Money(
amount=self.amount / Decimal(float(other)),
See also :meth:`round_fractional_cents`
"""
- self.amount = self.amount.quantize(Decimal('.01'), rounding=ROUND_FLOOR)
+ self.amount = self.amount.quantize(Decimal(".01"), rounding=ROUND_FLOOR)
return self.amount
def round_fractional_cents(self):
See also :meth:`truncate_fractional_cents`
"""
- self.amount = self.amount.quantize(Decimal('.01'), rounding=ROUND_HALF_DOWN)
+ self.amount = self.amount.quantize(Decimal(".01"), rounding=ROUND_HALF_DOWN)
return self.amount
__radd__ = __add__
if self.currency == other.currency:
return Money(amount=other.amount - self.amount, currency=self.currency)
else:
- raise TypeError('Incompatible currencies in sub expression')
+ raise TypeError("Incompatible currencies in sub expression")
else:
if self.strict_mode:
- raise TypeError('In strict_mode only two moneys can be added')
+ raise TypeError("In strict_mode only two moneys can be added")
else:
return Money(
amount=Decimal(float(other)) - self.amount,
if self.currency == other.currency:
return self.amount < other.amount
else:
- raise TypeError('can not directly compare different currencies')
+ raise TypeError("can not directly compare different currencies")
else:
if self.strict_mode:
- raise TypeError('In strict mode, only two Moneys can be compated')
+ raise TypeError("In strict mode, only two Moneys can be compated")
else:
return self.amount < Decimal(float(other))
if self.currency == other.currency:
return self.amount > other.amount
else:
- raise TypeError('can not directly compare different currencies')
+ raise TypeError("can not directly compare different currencies")
else:
if self.strict_mode:
- raise TypeError('In strict mode, only two Moneys can be compated')
+ raise TypeError("In strict mode, only two Moneys can be compated")
else:
return self.amount > Decimal(float(other))
amount = None
currency = None
s = s.strip()
- chunks = s.split(' ')
+ chunks = s.split(" ")
try:
for chunk in chunks:
if Money.AMOUNT_RE.match(chunk) is not None:
if amount is not None and currency is not None:
return (amount, currency)
elif amount is not None:
- return (amount, 'USD')
+ return (amount, "USD")
return None
@classmethod
- def parse(cls, s: str) -> 'Money':
+ def parse(cls, s: str) -> "Money":
"""Parses a string an attempts to create a :class:`Money`
instance.
raise Exception(f'Unable to parse money string "{s}"')
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
--- /dev/null
+#!/usr/bin/env python3
+
+# © Copyright 2021-2022, Scott Gasch
+
+"""dateparse_utils unittest."""
+
+import datetime
+import random
+import re
+import unittest
+
+import pytz
+
+import pyutils.datetimes.dateparse_utils as du
+import pyutils.unittest_utils as uu
+
+parsable_expressions = [
+ ("today", datetime.datetime(2021, 7, 2)),
+ ("tomorrow", datetime.datetime(2021, 7, 3)),
+ ("yesterday", datetime.datetime(2021, 7, 1)),
+ ("21:30", datetime.datetime(2021, 7, 2, 21, 30, 0, 0)),
+ (
+ "21:30 EST",
+ datetime.datetime(2021, 7, 2, 21, 30, 0, 0, tzinfo=pytz.timezone("EST")),
+ ),
+ (
+ "21:30 -0500",
+ datetime.datetime(2021, 7, 2, 21, 30, 0, 0, tzinfo=pytz.timezone("EST")),
+ ),
+ ("12:01am", datetime.datetime(2021, 7, 2, 0, 1, 0, 0)),
+ ("12:02p", datetime.datetime(2021, 7, 2, 12, 2, 0, 0)),
+ ("0:03", datetime.datetime(2021, 7, 2, 0, 3, 0, 0)),
+ ("last wednesday", datetime.datetime(2021, 6, 30)),
+ ("this wed", datetime.datetime(2021, 7, 7)),
+ ("next wed", datetime.datetime(2021, 7, 14)),
+ ("this coming tues", datetime.datetime(2021, 7, 6)),
+ ("this past monday", datetime.datetime(2021, 6, 28)),
+ ("4 days ago", datetime.datetime(2021, 6, 28)),
+ ("4 mondays ago", datetime.datetime(2021, 6, 7)),
+ ("4 months ago", datetime.datetime(2021, 3, 2)),
+ ("3 days back", datetime.datetime(2021, 6, 29)),
+ ("13 weeks from now", datetime.datetime(2021, 10, 1)),
+ ("1 year from now", datetime.datetime(2022, 7, 2)),
+ ("4 weeks from now", datetime.datetime(2021, 7, 30)),
+ ("3 saturdays ago", datetime.datetime(2021, 6, 12)),
+ ("4 months from today", datetime.datetime(2021, 11, 2)),
+ ("4 years from yesterday", datetime.datetime(2025, 7, 1)),
+ ("4 weeks from tomorrow", datetime.datetime(2021, 7, 31)),
+ ("april 15, 2005", datetime.datetime(2005, 4, 15)),
+ ("april 14", datetime.datetime(2021, 4, 14)),
+ ("9:30am on last wednesday", datetime.datetime(2021, 6, 30, 9, 30)),
+ ("2005/apr/15", datetime.datetime(2005, 4, 15)),
+ ("2005 apr 15", datetime.datetime(2005, 4, 15)),
+ ("the 1st wednesday in may", datetime.datetime(2021, 5, 5)),
+ ("last sun of june", datetime.datetime(2021, 6, 27)),
+ ("this Easter", datetime.datetime(2021, 4, 4)),
+ ("last christmas", datetime.datetime(2020, 12, 25)),
+ ("last Xmas", datetime.datetime(2020, 12, 25)),
+ ("xmas, 1999", datetime.datetime(1999, 12, 25)),
+ ("next mlk day", datetime.datetime(2022, 1, 17)),
+ ("Halloween, 2020", datetime.datetime(2020, 10, 31)),
+ ("5 work days after independence day", datetime.datetime(2021, 7, 12)),
+ ("50 working days from last wed", datetime.datetime(2021, 9, 10)),
+ ("25 working days before columbus day", datetime.datetime(2021, 9, 3)),
+ ("today +1 week", datetime.datetime(2021, 7, 9)),
+ ("sunday -3 weeks", datetime.datetime(2021, 6, 13)),
+ ("4 weeks before xmas, 1999", datetime.datetime(1999, 11, 27)),
+ ("3 days before new years eve, 2000", datetime.datetime(2000, 12, 28)),
+ ("july 4th", datetime.datetime(2021, 7, 4)),
+ ("the ides of march", datetime.datetime(2021, 3, 15)),
+ ("the nones of april", datetime.datetime(2021, 4, 5)),
+ ("the kalends of may", datetime.datetime(2021, 5, 1)),
+ ("9/11/2001", datetime.datetime(2001, 9, 11)),
+ ("4 sundays before veterans' day", datetime.datetime(2021, 10, 17)),
+ ("xmas eve", datetime.datetime(2021, 12, 24)),
+ ("this friday at 5pm", datetime.datetime(2021, 7, 9, 17, 0, 0)),
+ ("presidents day", datetime.datetime(2021, 2, 15)),
+ ("memorial day, 1921", datetime.datetime(1921, 5, 30)),
+ ("today -4 wednesdays", datetime.datetime(2021, 6, 9)),
+ ("thanksgiving", datetime.datetime(2021, 11, 25)),
+ ("2 sun in jun", datetime.datetime(2021, 6, 13)),
+ ("easter -40 days", datetime.datetime(2021, 2, 23)),
+ ("easter +39 days", datetime.datetime(2021, 5, 13)),
+ ("2nd Sunday in May, 2022", datetime.datetime(2022, 5, 8)),
+ ("1st tuesday in nov, 2024", datetime.datetime(2024, 11, 5)),
+ (
+ "2 days before last xmas at 3:14:15.92a",
+ datetime.datetime(2020, 12, 23, 3, 14, 15, 92),
+ ),
+ (
+ "3 weeks after xmas, 1995 at midday",
+ datetime.datetime(1996, 1, 15, 12, 0, 0),
+ ),
+ (
+ "4 months before easter, 1992 at midnight",
+ datetime.datetime(1991, 12, 19),
+ ),
+ (
+ "5 months before halloween, 1995 at noon",
+ datetime.datetime(1995, 5, 31, 12),
+ ),
+ ("4 days before last wednesday", datetime.datetime(2021, 6, 26)),
+ ("44 months after today", datetime.datetime(2025, 3, 2)),
+ ("44 years before today", datetime.datetime(1977, 7, 2)),
+ ("44 weeks ago", datetime.datetime(2020, 8, 28)),
+ ("15 minutes to 3am", datetime.datetime(2021, 7, 2, 2, 45)),
+ ("quarter past 4pm", datetime.datetime(2021, 7, 2, 16, 15)),
+ ("half past 9", datetime.datetime(2021, 7, 2, 9, 30)),
+ ("4 seconds to midnight", datetime.datetime(2021, 7, 1, 23, 59, 56)),
+ (
+ "4 seconds to midnight, tomorrow",
+ datetime.datetime(2021, 7, 2, 23, 59, 56),
+ ),
+ ("2021/apr/15T21:30:44.55", datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
+ (
+ "2021/apr/15 at 21:30:44.55",
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ "2021/4/15 at 21:30:44.55",
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ "2021/04/15 at 21:30:44.55",
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ "2021/04/15 at 21:30:44.55Z",
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone("UTC")),
+ ),
+ (
+ "2021/04/15 at 21:30:44.55EST",
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone("EST")),
+ ),
+ (
+ "13 days after last memorial day at 12 seconds before 4pm",
+ datetime.datetime(2020, 6, 7, 15, 59, 48),
+ ),
+ (
+ " 2 days before yesterday at 9am ",
+ datetime.datetime(2021, 6, 29, 9),
+ ),
+ ("-3 days before today", datetime.datetime(2021, 7, 5)),
+ (
+ "3 days before yesterday at midnight EST",
+ datetime.datetime(2021, 6, 28, tzinfo=pytz.timezone("EST")),
+ ),
+]
+
+
+class TestDateparseUtils(unittest.TestCase):
+ @uu.check_method_for_perf_regressions
+ def test_dateparsing(self):
+ dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
+
+ for (txt, expected_dt) in parsable_expressions:
+ try:
+ actual_dt = dp.parse(txt)
+ self.assertIsNotNone(actual_dt)
+ self.assertEqual(
+ actual_dt,
+ expected_dt,
+ f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
+ )
+ except du.ParseException:
+ self.fail(f'Expected "{txt}" to parse successfully.')
+
+ def test_whitespace_handling(self):
+ dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
+
+ for (txt, expected_dt) in parsable_expressions:
+ try:
+ txt = f" {txt} "
+ i = random.randint(2, 5)
+ replacement = " " * i
+ txt = re.sub(r"\s", replacement, txt)
+ actual_dt = dp.parse(txt)
+ self.assertIsNotNone(actual_dt)
+ self.assertEqual(
+ actual_dt,
+ expected_dt,
+ f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
+ )
+ except du.ParseException:
+ self.fail(f'Expected "{txt}" to parse successfully.')
+
+
+if __name__ == "__main__":
+ unittest.main()
+++ /dev/null
-#!/usr/bin/env python3
-
-# © Copyright 2021-2022, Scott Gasch
-
-"""dateparse_utils unittest."""
-
-import datetime
-import random
-import re
-import unittest
-
-import pytz
-
-import pyutils.datetimez.dateparse_utils as du
-import pyutils.unittest_utils as uu
-
-parsable_expressions = [
- ('today', datetime.datetime(2021, 7, 2)),
- ('tomorrow', datetime.datetime(2021, 7, 3)),
- ('yesterday', datetime.datetime(2021, 7, 1)),
- ('21:30', datetime.datetime(2021, 7, 2, 21, 30, 0, 0)),
- (
- '21:30 EST',
- datetime.datetime(2021, 7, 2, 21, 30, 0, 0, tzinfo=pytz.timezone('EST')),
- ),
- (
- '21:30 -0500',
- datetime.datetime(2021, 7, 2, 21, 30, 0, 0, tzinfo=pytz.timezone('EST')),
- ),
- ('12:01am', datetime.datetime(2021, 7, 2, 0, 1, 0, 0)),
- ('12:02p', datetime.datetime(2021, 7, 2, 12, 2, 0, 0)),
- ('0:03', datetime.datetime(2021, 7, 2, 0, 3, 0, 0)),
- ('last wednesday', datetime.datetime(2021, 6, 30)),
- ('this wed', datetime.datetime(2021, 7, 7)),
- ('next wed', datetime.datetime(2021, 7, 14)),
- ('this coming tues', datetime.datetime(2021, 7, 6)),
- ('this past monday', datetime.datetime(2021, 6, 28)),
- ('4 days ago', datetime.datetime(2021, 6, 28)),
- ('4 mondays ago', datetime.datetime(2021, 6, 7)),
- ('4 months ago', datetime.datetime(2021, 3, 2)),
- ('3 days back', datetime.datetime(2021, 6, 29)),
- ('13 weeks from now', datetime.datetime(2021, 10, 1)),
- ('1 year from now', datetime.datetime(2022, 7, 2)),
- ('4 weeks from now', datetime.datetime(2021, 7, 30)),
- ('3 saturdays ago', datetime.datetime(2021, 6, 12)),
- ('4 months from today', datetime.datetime(2021, 11, 2)),
- ('4 years from yesterday', datetime.datetime(2025, 7, 1)),
- ('4 weeks from tomorrow', datetime.datetime(2021, 7, 31)),
- ('april 15, 2005', datetime.datetime(2005, 4, 15)),
- ('april 14', datetime.datetime(2021, 4, 14)),
- ('9:30am on last wednesday', datetime.datetime(2021, 6, 30, 9, 30)),
- ('2005/apr/15', datetime.datetime(2005, 4, 15)),
- ('2005 apr 15', datetime.datetime(2005, 4, 15)),
- ('the 1st wednesday in may', datetime.datetime(2021, 5, 5)),
- ('last sun of june', datetime.datetime(2021, 6, 27)),
- ('this Easter', datetime.datetime(2021, 4, 4)),
- ('last christmas', datetime.datetime(2020, 12, 25)),
- ('last Xmas', datetime.datetime(2020, 12, 25)),
- ('xmas, 1999', datetime.datetime(1999, 12, 25)),
- ('next mlk day', datetime.datetime(2022, 1, 17)),
- ('Halloween, 2020', datetime.datetime(2020, 10, 31)),
- ('5 work days after independence day', datetime.datetime(2021, 7, 12)),
- ('50 working days from last wed', datetime.datetime(2021, 9, 10)),
- ('25 working days before columbus day', datetime.datetime(2021, 9, 3)),
- ('today +1 week', datetime.datetime(2021, 7, 9)),
- ('sunday -3 weeks', datetime.datetime(2021, 6, 13)),
- ('4 weeks before xmas, 1999', datetime.datetime(1999, 11, 27)),
- ('3 days before new years eve, 2000', datetime.datetime(2000, 12, 28)),
- ('july 4th', datetime.datetime(2021, 7, 4)),
- ('the ides of march', datetime.datetime(2021, 3, 15)),
- ('the nones of april', datetime.datetime(2021, 4, 5)),
- ('the kalends of may', datetime.datetime(2021, 5, 1)),
- ('9/11/2001', datetime.datetime(2001, 9, 11)),
- ('4 sundays before veterans\' day', datetime.datetime(2021, 10, 17)),
- ('xmas eve', datetime.datetime(2021, 12, 24)),
- ('this friday at 5pm', datetime.datetime(2021, 7, 9, 17, 0, 0)),
- ('presidents day', datetime.datetime(2021, 2, 15)),
- ('memorial day, 1921', datetime.datetime(1921, 5, 30)),
- ('today -4 wednesdays', datetime.datetime(2021, 6, 9)),
- ('thanksgiving', datetime.datetime(2021, 11, 25)),
- ('2 sun in jun', datetime.datetime(2021, 6, 13)),
- ('easter -40 days', datetime.datetime(2021, 2, 23)),
- ('easter +39 days', datetime.datetime(2021, 5, 13)),
- ('2nd Sunday in May, 2022', datetime.datetime(2022, 5, 8)),
- ('1st tuesday in nov, 2024', datetime.datetime(2024, 11, 5)),
- (
- '2 days before last xmas at 3:14:15.92a',
- datetime.datetime(2020, 12, 23, 3, 14, 15, 92),
- ),
- (
- '3 weeks after xmas, 1995 at midday',
- datetime.datetime(1996, 1, 15, 12, 0, 0),
- ),
- (
- '4 months before easter, 1992 at midnight',
- datetime.datetime(1991, 12, 19),
- ),
- (
- '5 months before halloween, 1995 at noon',
- datetime.datetime(1995, 5, 31, 12),
- ),
- ('4 days before last wednesday', datetime.datetime(2021, 6, 26)),
- ('44 months after today', datetime.datetime(2025, 3, 2)),
- ('44 years before today', datetime.datetime(1977, 7, 2)),
- ('44 weeks ago', datetime.datetime(2020, 8, 28)),
- ('15 minutes to 3am', datetime.datetime(2021, 7, 2, 2, 45)),
- ('quarter past 4pm', datetime.datetime(2021, 7, 2, 16, 15)),
- ('half past 9', datetime.datetime(2021, 7, 2, 9, 30)),
- ('4 seconds to midnight', datetime.datetime(2021, 7, 1, 23, 59, 56)),
- (
- '4 seconds to midnight, tomorrow',
- datetime.datetime(2021, 7, 2, 23, 59, 56),
- ),
- ('2021/apr/15T21:30:44.55', datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
- (
- '2021/apr/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
- ),
- (
- '2021/4/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
- ),
- (
- '2021/04/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
- ),
- (
- '2021/04/15 at 21:30:44.55Z',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone('UTC')),
- ),
- (
- '2021/04/15 at 21:30:44.55EST',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone('EST')),
- ),
- (
- '13 days after last memorial day at 12 seconds before 4pm',
- datetime.datetime(2020, 6, 7, 15, 59, 48),
- ),
- (
- ' 2 days before yesterday at 9am ',
- datetime.datetime(2021, 6, 29, 9),
- ),
- ('-3 days before today', datetime.datetime(2021, 7, 5)),
- (
- '3 days before yesterday at midnight EST',
- datetime.datetime(2021, 6, 28, tzinfo=pytz.timezone('EST')),
- ),
-]
-
-
-class TestDateparseUtils(unittest.TestCase):
- @uu.check_method_for_perf_regressions
- def test_dateparsing(self):
- dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
-
- for (txt, expected_dt) in parsable_expressions:
- try:
- actual_dt = dp.parse(txt)
- self.assertIsNotNone(actual_dt)
- self.assertEqual(
- actual_dt,
- expected_dt,
- f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
- )
- except du.ParseException:
- self.fail(f'Expected "{txt}" to parse successfully.')
-
- def test_whitespace_handling(self):
- dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
-
- for (txt, expected_dt) in parsable_expressions:
- try:
- txt = f' {txt} '
- i = random.randint(2, 5)
- replacement = ' ' * i
- txt = re.sub(r'\s', replacement, txt)
- actual_dt = dp.parse(txt)
- self.assertIsNotNone(actual_dt)
- self.assertEqual(
- actual_dt,
- expected_dt,
- f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
- )
- except du.ParseException:
- self.fail(f'Expected "{txt}" to parse successfully.')
-
-
-if __name__ == '__main__':
- unittest.main()
import unittest
from pyutils import unittest_utils
-from pyutils.typez.centcount import CentCount
+from pyutils.types.centcount import CentCount
class TestCentCount(unittest.TestCase):
amount /= another
def test_equality(self):
- usa = CentCount(1.0, 'USD')
- can = CentCount(1.0, 'CAD')
+ usa = CentCount(1.0, "USD")
+ can = CentCount(1.0, "CAD")
self.assertNotEqual(usa, can)
- eh = CentCount(1.0, 'CAD')
+ eh = CentCount(1.0, "CAD")
self.assertEqual(can, eh)
def test_comparison(self):
self.assertLess(neg_one, one)
self.assertGreater(one, neg_one)
self.assertGreater(three, one)
- looney = CentCount(1.0, 'CAD')
+ looney = CentCount(1.0, "CAD")
with self.assertRaises(TypeError):
print(looney < one)
self.assertTrue(two > one)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
from decimal import Decimal
from pyutils import unittest_utils
-from pyutils.typez.money import Money
+from pyutils.types.money import Money
class TestMoney(unittest.TestCase):
amount /= another
def test_equality(self):
- usa = Money(1.0, 'USD')
- can = Money(1.0, 'CAD')
+ usa = Money(1.0, "USD")
+ can = Money(1.0, "CAD")
self.assertNotEqual(usa, can)
- eh = Money(1.0, 'CAD')
+ eh = Money(1.0, "CAD")
self.assertEqual(can, eh)
def test_comparison(self):
self.assertLess(neg_one, one)
self.assertGreater(one, neg_one)
self.assertGreater(three, one)
- looney = Money(1.0, 'CAD')
+ looney = Money(1.0, "CAD")
with self.assertRaises(TypeError):
print(looney < one)
ten = Money(10.0)
x = ten * 2 / 3
expected = Decimal(6.66)
- expected = expected.quantize(Decimal('.01'))
+ expected = expected.quantize(Decimal(".01"))
self.assertEqual(expected, x.truncate_fractional_cents())
x = ten * 2 / 3
expected = Decimal(6.67)
- expected = expected.quantize(Decimal('.01'))
+ expected = expected.quantize(Decimal(".01"))
self.assertEqual(expected, x.round_fractional_cents())
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
import unittest
from pyutils import unittest_utils
-from pyutils.typez.money import Money
-from pyutils.typez.rate import Rate
+from pyutils.types.money import Money
+from pyutils.types.rate import Rate
class TestRate(unittest.TestCase):
self.assertEqual("+50.000%", s)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()