# © Copyright 2021-2022, Scott Gasch
"""This is a module for wrapping around python programs and doing some
-minor setup and tear down work for them. With it, you can break into
-pdb on unhandled top level exceptions, profile your code by passing a
-commandline argument in, audit module import events, examine where
-memory is being used in your program, and so on.
+minor setup and tear down work for them. With it, you will get:
+
+* The ability to break into pdb on unhandled exceptions,
+* automatic support for :file:`config.py` (argument parsing)
+* automatic logging support for :file:`logging.py`,
+* the ability to enable code profiling,
+* the ability to enable module import auditing,
+* optional memory profiling for your program,
+* ability to set random seed via commandline,
+* automatic program timing and reporting,
+* more verbose error handling and reporting,
+
+Most of these are enabled and/or configured via commandline flags
+(see below).
"""
def dump_all_objects() -> None:
+ """Helper code to dump all known python objects."""
+
messages = {}
all_modules = sys.modules
for obj in object.__subclasses__():
def initialize(entry_point):
"""
Remember to initialize config, initialize logging, set/log a random
- seed, etc... before running main.
+ seed, etc... before running main. If you use this decorator around
+ your main, like this::
+
+ import bootstrap
+
+ @bootstrap.initialize
+ def main():
+ whatever
+
+ if __name__ == '__main__':
+ main()
+
+ You get:
+
+ * The ability to break into pdb on unhandled exceptions,
+ * automatic support for :file:`config.py` (argument parsing)
+ * automatic logging support for :file:`logging.py`,
+ * the ability to enable code profiling,
+ * the ability to enable module import auditing,
+ * optional memory profiling for your program,
+ * ability to set random seed via commandline,
+ * automatic program timing and reporting,
+ * more verbose error handling and reporting,
+ Most of these are enabled and/or configured via commandline flags
+ (see below).
"""
@functools.wraps(entry_point)
# © Copyright 2021-2022, Scott Gasch
-"""How's the weather?"""
+"""A cache of weather data for Bellevue, WA.
+:class:`CachedWeatherData` class that derives from :class:`Persistent`
+so that, on creation, the decorator transparently pulls in data from
+disk, if possible, to avoid a network request.
+"""
import datetime
import json
@dataclass
class WeatherData:
- date: datetime.date # The date
- high: float # The predicted high in F
- low: float # The predicted low in F
- precipitation_inches: float # Number of inches of precipitation / day
- conditions: List[str] # Conditions per ~3h window
- most_common_condition: str # The most common condition
- icon: str # An icon to represent it
+ date: datetime.date
+ """The date of the forecast"""
+
+ high: float
+ """The predicted high temperature in F"""
+
+ low: float
+ """The predicted low temperature in F"""
+
+ precipitation_inches: float
+ """Number of inches of precipitation / day"""
+
+ conditions: List[str]
+ """Conditions per ~3h window"""
+
+ most_common_condition: str
+ """The most common condition of the day"""
+
+ icon: str
+ """An icon representing the most common condition of the day"""
@persistent.persistent_autoloaded_singleton() # type: ignore
If you set this up and remember to invoke config.parse(), all commandline
arguments will play nicely together. This is done automatically for you
- if you're using the bootstrap module's initialize wrapper.::
+ if you're using the :meth:`bootstrap.initialize` decorator on
+ your program's entry point. See :meth:`python_modules.bootstrap.initialize`
+ for more details.::
+
+ import bootstrap
+
+ @bootstrap.initialize
+ def main():
+ whatever
+
+ if __name__ == '__main__':
+ main()
+
+ Either way, you'll get this behavior from the commandline::
% main.py -h
usage: main.py [-h]
import os
import sys
+sys.path.insert(0, os.path.abspath('/home/scott/lib/python_modules'))
sys.path.insert(0, os.path.abspath('../..'))
sys.path.insert(0, os.path.abspath('../../cached'))
sys.path.insert(0, os.path.abspath('../../collect'))
================================================
.. toctree::
- :maxdepth: 3
+ :maxdepth: 2
:caption: Contents:
modules
# © Copyright 2021-2022, Scott Gasch
-"""Some exceptions used elsewhere."""
+"""Some general exceptions used elsewhere in the package."""
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
def cmd_showing_output(
command: str,
) -> int:
- """Kick off a child process. Capture and print all output that it
- produces on stdout and stderr. Wait for the subprocess to exit
- and return the exit value as the return code of this function.
+ """Kick off a child process. Capture and emit all output that it
+ produces on stdout and stderr in a character by character manner
+ so that we don't have to wait on newlines. This was done to
+ capture the output of a subprocess that created dots to show
+ incremental progress on a task and render it correctly.
+ Args:
+ command: the command to execute
+
+ Returns:
+ the exit status of the subprocess once the subprocess has
+ exited
+
+ Side effects:
+ prints all output of the child process (stdout or stderr)
"""
+
line_enders = set([b'\n', b'\r'])
sel = selectors.DefaultSelector()
with subprocess.Popen(
sel.close()
done = True
if key.fileobj is p.stdout:
- # sys.stdout.buffer.write(char)
os.write(sys.stdout.fileno(), char)
if char in line_enders:
sys.stdout.flush()
else:
- # sys.stderr.buffer.write(char)
os.write(sys.stderr.fileno(), char)
if char in line_enders:
sys.stderr.flush()
return p.returncode
-def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
- """Run a command but do not let it run for more than timeout seconds.
- Doesn't capture or rebroadcast command output. Function returns
- the exit value of the command or raises a TimeoutExpired exception
- if the deadline is exceeded.
+def cmd_with_timeout(command: str, timeout_seconds: Optional[float] = None) -> int:
+ """Run a command but do not let it run for more than timeout_seconds.
+ This code doesn't capture or rebroadcast the command's output. It
+ returns the exit value of the command or raises a TimeoutExpired
+ exception if the deadline is exceeded.
+
+ Args:
+ command: the command to run
+ timeout_seconds: the max number of seconds to allow the subprocess
+ to execute or None to indicate no timeout
+
+ Returns:
+ the exit status of the subprocess once the subprocess has
+ exited
>>> cmd_with_timeout('/bin/echo foo', 10.0)
0
- >>> cmd_with_timeout('/bin/sleep 2', 0.1)
+ >>> cmd_with_timeout('/bin/sleep 2', 0.01)
Traceback (most recent call last):
...
- subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
+ subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
"""
return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
- """Run a command and capture its output to stdout (only) in a string.
- Return that string as this function's output. Raises
+ """Run a command and capture its output to stdout (only) into a string
+ buffer. Return that string as this function's output. Raises
subprocess.CalledProcessError or TimeoutExpired on error.
+ Args:
+ command: the command to run
+ timeout_seconds: the max number of seconds to allow the subprocess
+ to execute or None to indicate no timeout
+
+ Returns:
+ The captured output of the subprocess' stdout as a string buffer
+
>>> cmd('/bin/echo foo')[:-1]
'foo'
- >>> cmd('/bin/sleep 2', 0.1)
+ >>> cmd('/bin/sleep 2', 0.01)
Traceback (most recent call last):
...
- subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.1 seconds
+ subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
"""
ret = subprocess.run(
"""Run a command silently but raise subprocess.CalledProcessError if
it fails.
+ Args:
+ command: the command to run
+ timeout_seconds: the max number of seconds to allow the subprocess
+ to execute or None to indicate no timeout
+
+ Returns:
+ No return value; error conditions (including non-zero child process
+ exits) produce exceptions.
+
>>> run_silently("/usr/bin/true")
>>> run_silently("/usr/bin/false")
def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
+ """Spawns a child process in the background and registers an exit
+ handler to make sure we kill it if the parent process (us) is
+ terminated.
+
+ Args:
+ command: the command to run
+ silent: do not allow any output from the child process to be displayed
+ in the parent process' window
+
+ Returns:
+ the :class:`Popen` object that can be used to communicate
+ with the background process.
+ """
args = shlex.split(command)
if silent:
subproc = subprocess.Popen(
SCP = '/usr/bin/scp -C'
-def make_cloud_pickle(fun, *args, **kwargs):
+def _make_cloud_pickle(fun, *args, **kwargs):
+ """Internal helper to create cloud pickles."""
logger.debug("Making cloudpickled bundle at %s", fun.__name__)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
- """The base executor interface definition."""
+ """The base executor interface definition. The interface for
+ :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+ :class:`ThreadExecutor`.
+ """
def __init__(self, *, title=''):
self.title = title
class ThreadExecutor(BaseExecutor):
- """A threadpool executor instance."""
+ """A threadpool executor. This executor uses python threads to
+ schedule tasks. Note that, at least as of python3.10, because of
+ the global lock in the interpreter itself, these do not
+ parallelize very well so this class is useful mostly for non-CPU
+ intensive tasks.
+
+ See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+ """
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
class ProcessExecutor(BaseExecutor):
- """A processpool executor."""
+ """An executor which runs tasks in child processes.
+
+ See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+ """
def __init__(self, max_workers=None):
super().__init__()
raise Exception('Submitted work after shutdown.')
start = time.time()
self.adjust_task_count(+1)
- pickle = make_cloud_pickle(function, *args, **kwargs)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
"""A record of info about a remote worker."""
username: str
+ """Username we can ssh into on this machine to run work."""
+
machine: str
+ """Machine address / name."""
+
weight: int
+ """Relative probability for the weighted policy to select this
+ machine for scheduling work."""
+
count: int
+ """If this machine is selected, what is the maximum number of task
+ that it can handle?"""
def __hash__(self):
return hash((self.username, self.machine))
"""All info necessary to define some unit of work that needs to be
done, where it is being run, its state, whether it is an original
bundle of a backup bundle, how many times it has failed, etc...
-
"""
pickled_code: bytes
+ """The code to run, cloud pickled"""
+
uuid: str
- fname: str
+ """A unique identifier"""
+
+ function_name: str
+ """The name of the function we pickled"""
+
worker: Optional[RemoteWorkerRecord]
+ """The remote worker running this bundle or None if none (yet)"""
+
username: Optional[str]
+ """The remote username running this bundle or None if none (yet)"""
+
machine: Optional[str]
+ """The remote machine running this bundle or None if none (yet)"""
+
hostname: str
+ """The controller machine"""
+
code_file: str
+ """A unique filename to hold the work to be done"""
+
result_file: str
+ """Where the results should be placed / read from"""
+
pid: int
+ """The process id of the local subprocess watching the ssh connection
+ to the remote machine"""
+
start_ts: float
+ """Starting time"""
+
end_ts: float
+ """Ending time"""
+
slower_than_local_p95: bool
+ """Currently slower then 95% of other bundles on remote host"""
+
slower_than_global_p95: bool
+ """Currently slower than 95% of other bundles globally"""
+
src_bundle: Optional[BundleDetails]
+ """If this is a backup bundle, this points to the original bundle
+ that it's backing up. None otherwise."""
+
is_cancelled: threading.Event
+ """An event that can be signaled to indicate this bundle is cancelled.
+ This is set when another copy (backup or original) of this work has
+ completed successfully elsewhere."""
+
was_cancelled: bool
+ """True if this bundle was cancelled, False if it finished normally"""
+
backup_bundles: Optional[List[BundleDetails]]
+ """If we've created backups of this bundle, this is the list of them"""
+
failure_count: int
+ """How many times has this bundle failed already?"""
def __repr__(self):
uuid = self.uuid
else:
suffix = uuid[-6:]
+ # We colorize the uuid based on some bits from it to make them
+ # stand out in the logging and help a reader correlate log messages
+ # related to the same bundle.
colorz = [
fg('violet red'),
fg('red'),
fg('medium purple'),
]
c = colorz[int(uuid[-2:], 16) % len(colorz)]
- fname = self.fname if self.fname is not None else 'nofname'
+ function_name = self.function_name if self.function_name is not None else 'nofname'
machine = self.machine if self.machine is not None else 'nomachine'
- return f'{c}{suffix}/{fname}/{machine}{reset()}'
+ return f'{c}{suffix}/{function_name}/{machine}{reset()}'
class RemoteExecutorStatus:
- """A status 'scoreboard' for a remote executor."""
+ """A status 'scoreboard' for a remote executor tracking various
+ metrics and able to render a periodic dump of global state.
+ """
def __init__(self, total_worker_count: int) -> None:
+ """C'tor.
+
+ Args:
+ total_worker_count: number of workers in the pool
+
+ """
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
self.lock: threading.Lock = threading.Lock()
def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Record that bundle with uuid is assigned to a particular worker.
+
+ Args:
+ worker: the record of the worker to which uuid is assigned
+ uuid: the uuid of a bundle that has been assigned to a worker
+ """
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Same as above but an entry point that doesn't acquire the lock
+ for codepaths where it's already held."""
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
self.in_flight_bundles_by_worker[worker] = x
def record_bundle_details(self, details: BundleDetails) -> None:
+ """Register the details about a bundle of work."""
with self.lock:
self.record_bundle_details_already_locked(details)
def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
self.bundle_details_by_uuid[details.uuid] = details
uuid: str,
was_cancelled: bool,
) -> None:
+ """Record that a bundle has released a worker."""
with self.lock:
self.record_release_worker_already_locked(worker, uuid, was_cancelled)
uuid: str,
was_cancelled: bool,
) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
ts = time.time()
self.end_per_bundle[uuid] = ts
self.finished_bundle_timings.append(bundle_latency)
def record_processing_began(self, uuid: str):
+ """Record when work on a bundle begins."""
with self.lock:
self.start_per_bundle[uuid] = time.time()
def total_in_flight(self) -> int:
+ """How many bundles are in flight currently?"""
assert self.lock.locked()
total_in_flight = 0
for worker in self.known_workers:
return total_in_flight
def total_idle(self) -> int:
+ """How many idle workers are there currently?"""
assert self.lock.locked()
return self.worker_count - self.total_in_flight()
class RemoteExecutor(BaseExecutor):
- """A remote work executor."""
+ """An executor that uses processes on remote machines to do work. This
+ works by creating "bundles" of work with pickled code in each to be
+ executed. Each bundle is assigned a remote worker based on some policy
+ heuristics. Once assigned to a remote worker, a local subprocess is
+ created. It copies the pickled code to the remote machine via ssh/scp
+ and then starts up work on the remote machine again using ssh. When
+ the work is complete it copies the results back to the local machine.
+
+ So there is essentially one "controller" machine (which may also be
+ in the remote executor pool and therefore do task work in addition to
+ controlling) and N worker machines. This code runs on the controller
+ whereas on the worker machines we invoke pickled user code via a
+ shim in :file:`remote_worker.py`.
+
+ Some redundancy and safety provisions are made; e.g. slower than
+ expected tasks have redundant backups created and if a task fails
+ repeatedly we consider it poisoned and give up on it.
+
+ .. warning::
+
+ The network overhead / latency of copying work from the
+ controller machine to the remote workers is relatively high.
+ This executor probably only makes sense to use with
+ computationally expensive tasks such as jobs that will execute
+ for ~30 seconds or longer.
+
+ See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+ """
def __init__(
self,
workers: List[RemoteWorkerRecord],
policy: RemoteWorkerSelectionPolicy,
) -> None:
+ """C'tor.
+
+ Args:
+ workers: A list of remote workers we can call on to do tasks.
+ policy: A policy for selecting remote workers for tasks.
+ """
+
super().__init__()
self.workers = workers
self.policy = policy
(
self.heartbeat_thread,
self.heartbeat_stop_event,
- ) = self.run_periodic_heartbeat()
+ ) = self._run_periodic_heartbeat()
self.already_shutdown = False
@background_thread
- def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ """
+ We create a background thread to invoke :meth:`_heartbeat` regularly
+ while we are scheduling work. It does some accounting such as
+ looking for slow bundles to tag for backup creation, checking for
+ unexpected failures, and printing a fancy message on stdout.
+ """
while not stop_event.is_set():
time.sleep(5.0)
logger.debug('Running periodic heartbeat code...')
- self.heartbeat()
+ self._heartbeat()
logger.debug('Periodic heartbeat thread shutting down.')
- def heartbeat(self) -> None:
+ def _heartbeat(self) -> None:
# Note: this is invoked on a background thread, not an
# executor thread. Be careful what you do with it b/c it
# needs to get back and dump status again periodically.
# Look for bundles to reschedule via executor.submit
if config.config['executors_schedule_remote_backups']:
- self.maybe_schedule_backup_bundles()
+ self._maybe_schedule_backup_bundles()
+
+ def _maybe_schedule_backup_bundles(self):
+ """Maybe schedule backup bundles if we see a very slow bundle."""
- def maybe_schedule_backup_bundles(self):
assert self.status.lock.locked()
num_done = len(self.status.finished_bundle_timings)
num_idle_workers = self.worker_count - self.task_count
# Note: this is all still happening on the heartbeat
# runner thread. That's ok because
- # schedule_backup_for_bundle uses the executor to
+ # _schedule_backup_for_bundle uses the executor to
# submit the bundle again which will cause it to be
# picked up by a worker thread and allow this thread
# to return to run future heartbeats.
bundle_to_backup,
best_score,
)
- self.schedule_backup_for_bundle(bundle_to_backup)
+ self._schedule_backup_for_bundle(bundle_to_backup)
finally:
self.backup_lock.release()
- def is_worker_available(self) -> bool:
+ def _is_worker_available(self) -> bool:
+ """Is there a worker available currently?"""
return self.policy.is_worker_available()
- def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ """Try to acquire a worker."""
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+ def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+ """Find a worker or block until one becomes available."""
with self.cv:
- while not self.is_worker_available():
+ while not self._is_worker_available():
self.cv.wait()
- worker = self.acquire_worker(machine_to_avoid)
+ worker = self._acquire_worker(machine_to_avoid)
if worker is not None:
return worker
msg = "We should never reach this point in the code"
logger.critical(msg)
raise Exception(msg)
- def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ """Release a previously acquired worker."""
worker = bundle.worker
assert worker is not None
logger.debug('Released worker %s', worker)
self.cv.notify()
self.adjust_task_count(-1)
- def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ """See if a particular bundle is cancelled. Do not block."""
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
return True
return False
- def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+ def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
"""Find a worker for bundle or block until one is available."""
self.adjust_task_count(+1)
avoid_machine = bundle.src_bundle.machine
worker = None
while worker is None:
- worker = self.find_available_worker_or_block(avoid_machine)
+ worker = self._find_available_worker_or_block(avoid_machine)
assert worker is not None
# Ok, found a worker.
# It may have been some time between when it was submitted and
# now due to lack of worker availability and someone else may
# have already finished it.
- if self.check_if_cancelled(bundle):
+ if self._check_if_cancelled(bundle):
try:
- return self.process_work_result(bundle)
+ return self._process_work_result(bundle)
except Exception as e:
logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
- self.release_worker(bundle)
+ self._release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# bundle. For it to have been cancelled, a backup
'no results for this bundle. This is unexpected and bad.',
bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
# We're a backup and our bundle is cancelled
# before we even got started. Do nothing and let
xfer_latency = time.time() - start_ts
logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
except Exception as e:
- self.release_worker(bundle)
+ self._release_worker(bundle)
if is_original:
# Weird. We tried to copy the code to the worker
# and it failed... And we're the original bundle.
"be a race condition. Attempting an emergency retry...",
bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
# This is actually expected; we're a backup.
# There's a race condition where someone else
return None
# Kick off the work. Note that if this fails we let
- # wait_for_process deal with it.
+ # _wait_for_process deal with it.
self.status.record_processing_began(uuid)
cmd = (
f'{SSH} {bundle.username}@{bundle.machine} '
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
- return self.wait_for_process(p, bundle, 0)
+ return self._wait_for_process(p, bundle, 0)
- def wait_for_process(
+ def _wait_for_process(
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
+ """At this point we've copied the bundle's pickled code to the remote
+ worker and started an ssh process that should be invoking the
+ remote worker to have it execute the user's code. See how
+ that's going and wait for it to complete or fail. Note that
+ this code is recursive: there are codepaths where we decide to
+ stop waiting for an ssh process (because another backup seems
+ to have finished) but then fail to fetch or parse the results
+ from that backup and thus call ourselves to continue waiting
+ on an active ssh process. This is the purpose of the depth
+ argument: to curtail potential infinite recursion by giving up
+ eventually.
+
+ Args:
+ p: the Popen record of the ssh job
+ bundle: the bundle of work being executed remotely
+ depth: how many retries we've made so far. Starts at zero.
+
+ """
+
machine = bundle.machine
assert p is not None
- pid = p.pid
+ pid = p.pid # pid of the ssh process
if depth > 3:
logger.error(
"I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
# Spin until either the ssh job we scheduled finishes the
# bundle or some backup worker signals that they finished it
try:
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
- if self.check_if_cancelled(bundle):
+ if self._check_if_cancelled(bundle):
logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
# If we get here we believe the bundle is done; either the ssh
# subprocess finished (hopefully successfully) or we noticed
# that some other worker seems to have completed the bundle
- # and we're bailing out.
+ # before us and we're bailing out.
try:
- ret = self.process_work_result(bundle)
+ ret = self._process_work_result(bundle)
if ret is not None and p is not None:
p.terminate()
return ret
logger.warning(
"%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
)
- return self.wait_for_process(p, bundle, depth + 1)
+ return self._wait_for_process(p, bundle, depth + 1)
else:
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
+
+ def _process_work_result(self, bundle: BundleDetails) -> Any:
+ """A bundle seems to be completed. Check on the results."""
- def process_work_result(self, bundle: BundleDetails) -> Any:
with self.status.lock:
is_original = bundle.src_bundle is None
was_cancelled = bundle.was_cancelled
)
# If either of these throw they are handled in
- # wait_for_process.
+ # _wait_for_process.
attempts = 0
while True:
try:
except Exception as e:
logger.exception(e)
logger.error('Failed to load %s... this is bad news.', result_file)
- self.release_worker(bundle)
+ self._release_worker(bundle)
- # Re-raise the exception; the code in wait_for_process may
- # decide to emergency_retry_nasty_bundle here.
+ # Re-raise the exception; the code in _wait_for_process may
+ # decide to _emergency_retry_nasty_bundle here.
raise e
logger.debug('Removing local (master) %s and %s.', code_file, result_file)
os.remove(result_file)
'%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
)
orig_bundle.is_cancelled.set()
- self.release_worker(bundle, was_cancelled=was_cancelled)
+ self._release_worker(bundle, was_cancelled=was_cancelled)
return result
- def create_original_bundle(self, pickle, fname: str):
+ def _create_original_bundle(self, pickle, function_name: str):
+ """Creates a bundle that is not a backup of any other bundle but
+ rather represents a user task.
+ """
+
uuid = string_utils.generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
bundle = BundleDetails(
pickled_code=pickle,
uuid=uuid,
- fname=fname,
+ function_name=function_name,
worker=None,
username=None,
machine=None,
logger.debug('%s: Created an original bundle', bundle)
return bundle
- def create_backup_bundle(self, src_bundle: BundleDetails):
+ def _create_backup_bundle(self, src_bundle: BundleDetails):
+ """Creates a bundle that is a backup of another bundle that is
+ running too slowly."""
+
assert self.status.lock.locked()
assert src_bundle.backup_bundles is not None
n = len(src_bundle.backup_bundles)
backup_bundle = BundleDetails(
pickled_code=src_bundle.pickled_code,
uuid=uuid,
- fname=src_bundle.fname,
+ function_name=src_bundle.function_name,
worker=None,
username=None,
machine=None,
logger.debug('%s: Created a backup bundle', backup_bundle)
return backup_bundle
- def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ """Schedule a backup of src_bundle."""
+
assert self.status.lock.locked()
assert src_bundle is not None
- backup_bundle = self.create_backup_bundle(src_bundle)
+ backup_bundle = self._create_backup_bundle(src_bundle)
logger.debug(
- '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
+ '%s/%s: Scheduling backup for execution...',
+ backup_bundle.uuid,
+ backup_bundle.function_name,
)
- self._helper_executor.submit(self.launch, backup_bundle)
+ self._helper_executor.submit(self._launch, backup_bundle)
# Results from backups don't matter; if they finish first
# they will move the result_file to this machine and let
# the original pick them up and unpickle them (and return
# a result).
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+ def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+ """Something unexpectedly failed with bundle. Either retry it
+ from the beginning or throw in the towel and give up on it."""
+
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
logger.warning(msg)
warnings.warn(msg)
- return self.launch(bundle, avoid_last_machine)
+ return self._launch(bundle, avoid_last_machine)
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ """Submit work to be done. This is the user entry point of this
+ class."""
if self.already_shutdown:
raise Exception('Submitted work after shutdown.')
- pickle = make_cloud_pickle(function, *args, **kwargs)
- bundle = self.create_original_bundle(pickle, function.__name__)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
+ bundle = self._create_original_bundle(pickle, function.__name__)
self.total_bundles_submitted += 1
- return self._helper_executor.submit(self.launch, bundle)
+ return self._helper_executor.submit(self._launch, bundle)
@overrides
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ """Shutdown the executor."""
if not self.already_shutdown:
logging.debug('Shutting down RemoteExecutor %s', self.title)
self.heartbeat_stop_event.set()
class DefaultExecutors(object):
"""A container for a default thread, process and remote executor.
These are not created until needed and we take care to clean up
- before process exit.
+ before process exit automatically for the caller's convenience.
+ Instead of creating your own executor, consider using the one
+ from this pool. e.g.::
+
+ @par.parallelize(method=par.Method.PROCESS)
+ def do_work(
+ solutions: List[Work],
+ shard_num: int,
+ ...
+ ):
+ <do the work>
+
+
+ def start_do_work(all_work: List[Work]):
+ shards = []
+ logger.debug('Sharding work into groups of 10.')
+ for subset in list_utils.shard(all_work, 10):
+ shards.append([x for x in subset])
+ logger.debug('Kicking off helper pool.')
+ try:
+ for n, shard in enumerate(shards):
+ results.append(
+ do_work(
+ shard, n, shared_cache.get_name(), max_letter_pop_per_word
+ )
+ )
+ smart_future.wait_all(results)
+ finally:
+ # Note: if you forget to do this it will clean itself up
+ # during program termination including tearing down any
+ # active ssh connections.
+ executors.DefaultExecutors().process_pool().shutdown()
"""
def __init__(self):
self.remote_executor: Optional[RemoteExecutor] = None
@staticmethod
- def ping(host) -> bool:
+ def _ping(host) -> bool:
logger.debug('RUN> ping -c 1 %s', host)
try:
x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
if self.remote_executor is None:
logger.info('Looking for some helper machines...')
pool: List[RemoteWorkerRecord] = []
- if self.ping('cheetah.house'):
+ if self._ping('cheetah.house'):
logger.info('Found cheetah.house')
pool.append(
RemoteWorkerRecord(
count=5,
),
)
- if self.ping('meerkat.cabin'):
+ if self._ping('meerkat.cabin'):
logger.info('Found meerkat.cabin')
pool.append(
RemoteWorkerRecord(
count=2,
),
)
- if self.ping('wannabe.house'):
+ if self._ping('wannabe.house'):
logger.info('Found wannabe.house')
pool.append(
RemoteWorkerRecord(
count=2,
),
)
- if self.ping('puma.cabin'):
+ if self._ping('puma.cabin'):
logger.info('Found puma.cabin')
pool.append(
RemoteWorkerRecord(
count=5,
),
)
- if self.ping('backup.house'):
+ if self._ping('backup.house'):
logger.info('Found backup.house')
pool.append(
RemoteWorkerRecord(
def remove_newlines(x: str) -> str:
+ """Trivial function to be used as a line_transformer in
+ :meth:`slurp_file` for no newlines in file contents"""
return x.replace('\n', '')
def strip_whitespace(x: str) -> str:
+ """Trivial function to be used as a line_transformer in
+ :meth:`slurp_file` for no leading / trailing whitespace in
+ file contents"""
return x.strip()
def remove_hash_comments(x: str) -> str:
+ """Trivial function to be used as a line_transformer in
+ :meth:`slurp_file` for no # comments in file contents"""
return re.sub(r'#.*$', '', x)
skip_blank_lines=False,
line_transformers: Optional[List[Callable[[str], str]]] = None,
):
+ """Reads in a file's contents line-by-line to a memory buffer applying
+ each line transformation in turn.
+
+ Args:
+ filename: file to be read
+ skip_blank_lines: should reading skip blank lines?
+ line_transformers: little string->string transformations
+ """
+
ret = []
+ xforms = []
+ if line_transformers is not None:
+ for x in line_transformers:
+ xforms.append(x)
if not file_is_readable(filename):
raise Exception(f'{filename} can\'t be read.')
with open(filename) as rf:
for line in rf:
- if line_transformers is not None:
- for transformation in line_transformers:
- line = transformation(line)
+ for transformation in xforms:
+ line = transformation(line)
if skip_blank_lines and line == '':
continue
ret.append(line)
"""Deletes a file. Raises if path refers to a directory or a file
that doesn't exist.
+ Args:
+ path: the path of the file to delete
+
>>> import os
>>> filename = '/tmp/file_utils_test_file'
>>> os.system(f'touch {filename}')
>>> remove(filename)
>>> does_file_exist(filename)
False
-
"""
os.remove(path)
def delete(path: str) -> None:
+ """This is a convenience for my dumb ass who can't remember os.remove
+ sometimes.
+ """
os.remove(path)
def without_extension(path: str) -> str:
- """Remove one extension from a file or path.
+ """Remove one (the last) extension from a file or path.
+
+ Args:
+ path: the path from which to remove an extension
+
+ Returns:
+ the path with one extension removed.
>>> without_extension('foobar.txt')
'foobar'
>>> without_extension('/home/scott/frapp.py')
'/home/scott/frapp'
- >>> without_extension('a.b.c.tar.gz')
- 'a.b.c.tar'
+ >>> f = 'a.b.c.tar.gz'
+ >>> while('.' in f):
+ ... f = without_extension(f)
+ ... print(f)
+ a.b.c.tar
+ a.b.c
+ a.b
+ a
>>> without_extension('foobar')
'foobar'
"""Removes all extensions from a path; handles multiple extensions
like foobar.tar.gz -> foobar.
+ Args:
+ path: the path from which to remove all extensions
+
+ Returns:
+ the path with all extensions removed.
+
>>> without_all_extensions('/home/scott/foobar.1.tar.gz')
'/home/scott/foobar'
def get_extension(path: str) -> str:
- """Extract and return one extension from a file or path.
+ """Extract and return one (the last) extension from a file or path.
+
+ Args:
+ path: the path from which to extract an extension
+
+ Returns:
+ The last extension from the file path.
>>> get_extension('this_is_a_test.txt')
'.txt'
def get_all_extensions(path: str) -> List[str]:
"""Return the extensions of a file or path in order.
+ Args:
+ path: the path from which to extract all extensions.
+
+ Returns:
+ a list containing each extension which may be empty.
+
>>> get_all_extensions('/home/scott/foo.tar.gz.1')
['.tar', '.gz', '.1']
+ >>> get_all_extensions('/home/scott/foobar')
+ []
+
"""
ret = []
while True:
def without_path(filespec: str) -> str:
"""Returns the base filename without any leading path.
+ Args:
+ filespec: path to remove leading directories from
+
+ Returns:
+ filespec without leading dir components.
+
>>> without_path('/home/scott/foo.py')
'foo.py'
"""Returns just the path of the filespec by removing the filename and
extension.
+ Args:
+ filespec: path to remove filename / extension(s) from
+
+ Returns:
+ filespec with just the leading directory components and no
+ filename or extension(s)
+
>>> get_path('/home/scott/foobar.py')
'/home/scott'
+ >>> get_path('/home/scott/test.1.2.3.gz')
+ '/home/scott'
+
>>> get_path('~scott/frapp.txt')
'~scott'
def get_canonical_path(filespec: str) -> str:
"""Returns a canonicalized absolute path.
+ Args:
+ filespec: the path to canonicalize
+
+ Returns:
+ the canonicalized path
+
>>> get_canonical_path('/home/scott/../../home/lynn/../scott/foo.txt')
'/usr/home/scott/foo.txt'
return os.path.realpath(filespec)
-def create_path_if_not_exist(path, on_error=None):
+def create_path_if_not_exist(path, on_error=None) -> None:
"""
- Attempts to create path if it does not exist. If on_error is
- specified, it is called with an exception if one occurs, otherwise
- exception is rethrown.
+ Attempts to create path if it does not exist already.
+
+ .. warning::
+
+ Files are created with mode 0x0777 (i.e. world read/writeable).
+
+ Args:
+ path: the path to attempt to create
+ on_error: If True, it's invoked on error conditions. Otherwise
+ any exceptions are raised.
>>> import uuid
>>> import os
def does_file_exist(filename: str) -> bool:
"""Returns True if a file exists and is a normal file.
+ Args:
+ filename: filename to check
+
+ Returns:
+ True if filename exists and is a normal file.
+
>>> does_file_exist(__file__)
True
+ >>> does_file_exist('/tmp/2492043r9203r9230r9230r49230r42390r4230')
+ False
"""
return os.path.exists(filename) and os.path.isfile(filename)
def file_is_readable(filename: str) -> bool:
+ """True if file exists, is a normal file and is readable by the
+ current process. False otherwise.
+
+ Args:
+ filename: the filename to check for read access
+ """
return does_file_exist(filename) and os.access(filename, os.R_OK)
def file_is_writable(filename: str) -> bool:
+ """True if file exists, is a normal file and is writable by the
+ current process. False otherwise.
+
+ Args:
+ filename: the file to check for write access.
+ """
return does_file_exist(filename) and os.access(filename, os.W_OK)
def file_is_executable(filename: str) -> bool:
+ """True if file exists, is a normal file and is executable by the
+ current process. False otherwise.
+
+ Args:
+ filename: the file to check for execute access.
+ """
return does_file_exist(filename) and os.access(filename, os.X_OK)
>>> does_directory_exist('/tmp')
True
+ >>> does_directory_exist('/xyzq/21341')
+ False
"""
return os.path.exists(dirname) and os.path.isdir(dirname)
def get_file_size(filename: str) -> int:
- """Returns the size of a file in bytes."""
+ """Returns the size of a file in bytes.
+
+ Args:
+ filename: the filename to size
+
+ Returns:
+ size of filename in bytes
+ """
return os.path.getsize(filename)
def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
- """Stats the file and returns an os.stat_result or None on error."""
+ """Stats the file and returns an os.stat_result or None on error.
+
+ Args:
+ filename: the file whose timestamps to fetch
+
+ Returns:
+ the os.stat_result or None to indicate an error occurred
+ """
try:
return os.stat(filename)
except Exception as e:
return None
-def get_file_raw_timestamp(filename: str, extractor) -> Optional[float]:
+def get_file_raw_timestamp(
+ filename: str, extractor: Callable[[os.stat_result], Optional[float]]
+) -> Optional[float]:
+ """Stat a file and, if successful, use extractor to fetch some
+ subset of the information in the os.stat_result. See also
+ :meth:`get_file_raw_atime`, :meth:`get_file_raw_mtime`, and
+ :meth:`get_file_raw_ctime` which just call this with a lambda
+ extractor.
+
+ Args:
+ filename: the filename to stat
+ extractor: Callable that takes a os.stat_result and produces
+ something useful(?) with it.
+
+ Returns:
+ whatever the extractor produced or None on error.
+ """
tss = get_file_raw_timestamps(filename)
if tss is not None:
return extractor(tss)
def get_file_raw_atime(filename: str) -> Optional[float]:
+ """Get a file's raw access time or None on error.
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ and :meth:`get_file_atime_age_seconds`.
+ """
return get_file_raw_timestamp(filename, lambda x: x.st_atime)
def get_file_raw_mtime(filename: str) -> Optional[float]:
+ """Get a file's raw modification time or None on error.
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ and :meth:`get_file_mtime_age_seconds`.
+ """
return get_file_raw_timestamp(filename, lambda x: x.st_mtime)
def get_file_raw_ctime(filename: str) -> Optional[float]:
+ """Get a file's raw creation time or None on error.
+
+ See also :meth:`get_file_ctime_as_datetime`,
+ :meth:`get_file_ctime_timedelta`,
+ and :meth:`get_file_ctime_age_seconds`.
+ """
return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
def get_file_md5(filename: str) -> str:
- """Hashes filename's contents and returns an MD5."""
+ """Hashes filename's disk contents and returns the MD5 digest.
+
+ Args:
+ filename: the file whose contents to hash
+
+ Returns:
+ the MD5 digest of the file's contents. Raises on errors.
+ """
file_hash = hashlib.md5()
with open(filename, "rb") as f:
chunk = f.read(8192)
def set_file_raw_atime(filename: str, atime: float):
+ """Sets a file's raw access time.
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ :meth:`get_file_atime_age_seconds`,
+ and :meth:`get_file_raw_atime`.
+ """
mtime = get_file_raw_mtime(filename)
assert mtime is not None
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
+ """Sets a file's raw modification time.
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ :meth:`get_file_mtime_age_seconds`,
+ and :meth:`get_file_raw_mtime`.
+ """
atime = get_file_raw_atime(filename)
assert atime is not None
os.utime(filename, (atime, mtime))
def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
+ """Sets both a file's raw modification and access times
+
+ Args:
+ filename: the file whose times to set
+ ts: the raw time to set or None to indicate time should be
+ set to the current time.
+ """
if ts is not None:
os.utime(filename, (ts, ts))
else:
def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
+ """Convert a raw file timestamp into a python datetime."""
ts = producer(filename)
if ts is not None:
return datetime.datetime.fromtimestamp(ts)
def get_file_atime_as_datetime(filename: str) -> Optional[datetime.datetime]:
+ """Fetch a file's access time as a python datetime.
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ :meth:`get_file_atime_age_seconds`,
+ :meth:`describe_file_atime`,
+ and :meth:`get_file_raw_atime`.
+ """
return convert_file_timestamp_to_datetime(filename, get_file_raw_atime)
def get_file_mtime_as_datetime(filename: str) -> Optional[datetime.datetime]:
+ """Fetches a file's modification time as a python datetime.
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ :meth:`get_file_mtime_age_seconds`,
+ and :meth:`get_file_raw_mtime`.
+ """
return convert_file_timestamp_to_datetime(filename, get_file_raw_mtime)
def get_file_ctime_as_datetime(filename: str) -> Optional[datetime.datetime]:
+ """Fetches a file's creation time as a python datetime.
+
+ See also :meth:`get_file_ctime_as_datetime`,
+ :meth:`get_file_ctime_timedelta`,
+ :meth:`get_file_ctime_age_seconds`,
+ and :meth:`get_file_raw_ctime`.
+ """
return convert_file_timestamp_to_datetime(filename, get_file_raw_ctime)
def get_file_timestamp_age_seconds(filename: str, extractor) -> Optional[int]:
+ """~Internal helper"""
now = time.time()
ts = get_file_raw_timestamps(filename)
if ts is None:
def get_file_atime_age_seconds(filename: str) -> Optional[int]:
+ """Gets a file's access time as an age in seconds (ago).
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ :meth:`get_file_atime_age_seconds`,
+ :meth:`describe_file_atime`,
+ and :meth:`get_file_raw_atime`.
+ """
return get_file_timestamp_age_seconds(filename, lambda x: x.st_atime)
def get_file_ctime_age_seconds(filename: str) -> Optional[int]:
+ """Gets a file's creation time as an age in seconds (ago).
+
+ See also :meth:`get_file_ctime_as_datetime`,
+ :meth:`get_file_ctime_timedelta`,
+ :meth:`get_file_ctime_age_seconds`,
+ and :meth:`get_file_raw_ctime`.
+ """
return get_file_timestamp_age_seconds(filename, lambda x: x.st_ctime)
def get_file_mtime_age_seconds(filename: str) -> Optional[int]:
+ """Gets a file's modification time as seconds (ago).
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ :meth:`get_file_mtime_age_seconds`,
+ and :meth:`get_file_raw_mtime`.
+ """
return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
+ """~Internal helper"""
age = get_file_timestamp_age_seconds(filename, extractor)
if age is not None:
return datetime.timedelta(seconds=float(age))
def get_file_atime_timedelta(filename: str) -> Optional[datetime.timedelta]:
+ """How long ago was a file accessed as a timedelta?
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ :meth:`get_file_atime_age_seconds`,
+ :meth:`describe_file_atime`,
+ and :meth:`get_file_raw_atime`.
+ """
return get_file_timestamp_timedelta(filename, lambda x: x.st_atime)
def get_file_ctime_timedelta(filename: str) -> Optional[datetime.timedelta]:
+ """How long ago was a file created as a timedelta?
+
+ See also :meth:`get_file_ctime_as_datetime`,
+ :meth:`get_file_ctime_timedelta`,
+ :meth:`get_file_ctime_age_seconds`,
+ and :meth:`get_file_raw_ctime`.
+ """
return get_file_timestamp_timedelta(filename, lambda x: x.st_ctime)
def get_file_mtime_timedelta(filename: str) -> Optional[datetime.timedelta]:
+ """
+ Gets a file's modification time as a python timedelta.
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ :meth:`get_file_mtime_age_seconds`,
+ and :meth:`get_file_raw_mtime`.
+ """
return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
+ """~Internal helper"""
from datetime_utils import describe_duration, describe_duration_briefly
age = get_file_timestamp_age_seconds(filename, extractor)
def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
+ """
+ Describe how long ago a file was accessed.
+
+ See also :meth:`get_file_atime_as_datetime`,
+ :meth:`get_file_atime_timedelta`,
+ :meth:`get_file_atime_age_seconds`,
+ :meth:`describe_file_atime`,
+ and :meth:`get_file_raw_atime`.
+ """
return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
+ """Describes a file's creation time.
+
+ See also :meth:`get_file_ctime_as_datetime`,
+ :meth:`get_file_ctime_timedelta`,
+ :meth:`get_file_ctime_age_seconds`,
+ and :meth:`get_file_raw_ctime`.
+ """
return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
+ """
+ Describes how long ago a file was modified.
+
+ See also :meth:`get_file_mtime_as_datetime`,
+ :meth:`get_file_mtime_timedelta`,
+ :meth:`get_file_mtime_age_seconds`,
+ and :meth:`get_file_raw_mtime`.
+ """
return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
def touch_file(filename: str, *, mode: Optional[int] = 0o666):
+ """Like unix "touch" command's semantics: update the timestamp
+ of a file to the current time if the file exists. Create the
+ file if it doesn't exist.
+
+ Args:
+ filename: the filename
+ mode: the mode to create the file with
+ """
pathlib.Path(filename, mode=mode).touch()
def expand_globs(in_filename: str):
+ """Expands shell globs (* and ? wildcards) to the matching files."""
for filename in glob.glob(in_filename):
yield filename
def get_files(directory: str):
+ """Returns the files in a directory as a generator."""
for filename in os.listdir(directory):
full_path = join(directory, filename)
if isfile(full_path) and exists(full_path):
def get_directories(directory: str):
+ """Returns the subdirectories in a directory as a generator."""
for d in os.listdir(directory):
full_path = join(directory, d)
if not isfile(full_path) and exists(full_path):
def get_files_recursive(directory: str):
+ """Find the files and directories under a root recursively."""
for filename in get_files(directory):
yield filename
for subdir in get_directories(directory):
class FileWriter(contextlib.AbstractContextManager):
"""A helper that writes a file to a temporary location and then moves
it atomically to its ultimate destination on close.
-
"""
def __init__(self, filename: str) -> None:
>>> function_identifier(function_identifier)
'function_utils:function_identifier'
-
"""
+
if f.__module__ == '__main__':
from pathlib import Path
# © Copyright 2022, Scott Gasch
"""Wrapper around US Census address geocoder API described here:
-https://www2.census.gov/geo/pdfs/maps-data/data/Census_Geocoder_User_Guide.pdf
-https://geocoding.geo.census.gov/geocoder/Geocoding_Services_API.pdf
-Also try:
+* https://www2.census.gov/geo/pdfs/maps-data/data/Census_Geocoder_User_Guide.pdf
+* https://geocoding.geo.census.gov/geocoder/Geocoding_Services_API.pdf
+
+Also try::
--form benchmark=2020 \
def geocode_address(address: str) -> Optional[Dict[str, Any]]:
- """Send a single address to the US Census geocoding API. The response
- is a parsed JSON chunk of data with N addressMatches in the result
- section and the details of each match within it. Returns None on error.
+ """Send a single address to the US Census geocoding API in order to
+ lookup relevant data about it (including, if possible, its
+ lat/long). The response is a parsed JSON chunk of data with N
+ addressMatches in the result section and the details of each match
+ within it.
+
+ Args:
+ address: the full address to lookup in the form: "STREET
+ ADDRESS, CITY, STATE, ZIPCODE". These components may be
+ omitted and the service will make educated guesses but
+ the commas delimiting each component must be included.
+
+ Returns:
+ A parsed json dict with a bunch of information about the
+ address contained within it. Each 'addressMatch'
+ in the JSON describes the details of a possible match.
+ Returns None if there was an error or the address is
+ not known.
>>> json = geocode_address('4600 Silver Hill Rd,, 20233')
>>> json['result']['addressMatches'][0]['matchedAddress']
>>> json['result']['addressMatches'][0]['coordinates']
{'x': -76.9274328556918, 'y': 38.845989080537514}
-
"""
url = 'https://geocoding.geo.census.gov/geocoder/geographies/onelineaddress'
url += f'?address={address}'
return r.json()
-def batch_geocode_addresses(addresses: List[str]):
- """Send up to addresses for batch geocoding. Each line of the input
- list should be a single address of the form: STREET ADDRESS, CITY,
- STATE, ZIP. Components may be omitted but the commas may not be.
- Result is an array of the same size as the input array with one
- answer record per line. Returns None on error.
+def batch_geocode_addresses(addresses: List[str]) -> Optional[List[str]]:
+ """Send a list of addresses for batch geocoding to a web service
+ operated by the US Census Bureau.
+
+ Args:
+ addresses: a list of addresses to geocode. Each line of the
+ input list should be a single address in the form: "STREET
+ ADDRESS, CITY, STATE, ZIPCODE". Individual address components
+ may be omitted and the service will make educated guesses but
+ the commas delimiters between address components may not be
+ omitted.
+
+ Returns:
+ An array of the same size as the input array with one
+ answer record per line. Returns None on error.
- This code will deal with requests >10k addresses by chunking them
- internally because the census website disallows requests > 10k lines.
+ Note: this code will deal with requests >10k addresses by chunking
+ them internally because the census website disallows requests >
+ 10k lines.
>>> batch_geocode_addresses(
... [
"""A module to serve as a local client library around HTTP calls to
the Google Assistant via a local gateway.
-
"""
import logging
@dataclass
class GoogleResponse:
- """A response wrapper."""
+ """A Google response wrapper dataclass."""
success: bool = False
+ """Did the request succeed (True) or fail (False)?"""
+
response: str = ''
+ """The response as a text string, if available."""
+
audio_url: str = ''
- audio_transcription: Optional[str] = None # None if not available.
+ """A URL that can be used to fetch the raw audio response."""
+
+ audio_transcription: Optional[str] = None
+ """A transcription of the audio response, if available. Otherwise
+ None"""
def __repr__(self):
return f"""
def ask_google(cmd: str, *, recognize_speech=True) -> GoogleResponse:
- """Send a command string to Google via the google_assistant_bridge as the
- user google_assistant_username and return the response. If recognize_speech
- is True, perform speech recognition on the audio response from Google so as
- to translate it into text (best effort, YMMV).
+ """Send a command string to Google via the google_assistant_bridge as
+ the user google_assistant_username and return the response. If
+ recognize_speech is True, perform speech recognition on the audio
+ response from Google so as to translate it into text (best effort,
+ YMMV). e.g.::
+
+ >>> google_assistant.ask_google('What time is it?')
+ success: True
+ response: 9:27 PM.
+ audio_transcription: 9:27 p.m.
+ audio_url: http://kiosk.house:3000/server/audio?v=1653971233030
+
"""
logging.debug("Asking google: '%s'", cmd)
payload = {
"""A collection of details about the internal histogram buckets."""
num_populated_buckets: int = 0
+ """Count of populated buckets"""
+
max_population: Optional[int] = None
+ """The max population in a bucket currently"""
+
last_bucket_start: Optional[int] = None
+ """The last bucket starting point"""
+
lowest_start: Optional[int] = None
+ """The lowest populated bucket's starting point"""
+
highest_end: Optional[int] = None
+ """The highest populated bucket's ending point"""
+
max_label_width: Optional[int] = None
+ """The maximum label width (for display purposes)"""
class SimpleHistogram(Generic[T]):
NEGATIVE_INFINITY = -math.inf
def __init__(self, buckets: List[Tuple[Bound, Bound]]):
+ """C'tor.
+
+ Args:
+ buckets: a list of [start..end] tuples that define the
+ buckets we are counting population in. See also
+ :meth:`n_evenly_spaced_buckets` to generate these
+ buckets more easily.
+ """
from math_utils import NumericPopulation
self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
max_bound: T,
n: int,
) -> List[Tuple[int, int]]:
+ """A helper method for generating the buckets argument to
+ our c'tor provided that you want N evenly spaced buckets.
+
+ Args:
+ min_bound: the minimum possible value
+ max_bound: the maximum possible value
+ n: how many buckets to create
+
+ Returns:
+ A list of bounds that define N evenly spaced buckets
+ """
ret: List[Tuple[int, int]] = []
stride = int((max_bound - min_bound) / n)
if stride <= 0:
return ret
def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
+ """Given an item, what bucket is it in?"""
for start_end in self.buckets:
if start_end[0] <= item < start_end[1]:
return start_end
return None
def add_item(self, item: T) -> bool:
+ """Adds a single item to the histogram (reculting in us incrementing
+ the population in the correct bucket.
+
+ Args:
+ item: the item to be added
+
+ Returns:
+ True if the item was successfully added or False if the item
+ is not within the bounds established during class construction.
+ """
bucket = self._get_bucket(item)
if bucket is None:
return False
return True
def add_items(self, lst: Iterable[T]) -> bool:
+ """Adds a collection of items to the histogram and increments
+ the correct bucket's population for each item.
+
+ Args:
+ lst: An iterable of items to be added
+
+ Returns:
+ True if all items were added successfully or False if any
+ item was not able to be added because it was not within the
+ bounds established at object construction.
+ """
all_true = True
for item in lst:
all_true = all_true and self.add_item(item)
return all_true
- def get_bucket_details(self, label_formatter: str) -> BucketDetails:
+ def _get_bucket_details(self, label_formatter: str) -> BucketDetails:
+ """Get the details about one bucket."""
details = BucketDetails()
for (start, end), pop in sorted(self.buckets.items(), key=lambda x: x[0]):
if pop > 0:
return details
def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
+ """Returns a pretty (text) representation of the histogram and
+ some vital stats about the population in it (min, max, mean,
+ median, mode, stdev, etc...)
+ """
from text_utils import bar_graph
- details = self.get_bucket_details(label_formatter)
+ details = self._get_bucket_details(label_formatter)
txt = ""
if details.num_populated_buckets == 0:
return txt
def get(name: str, *, start=0) -> int:
"""
- Returns a thread safe monotonically increasing id suitable for use
+ Returns a thread-safe, monotonically increasing id suitable for use
as a globally unique identifier.
>>> import id_generator
default_response: str = None,
timeout_seconds: int = None,
) -> Optional[str]: # None if timeout w/o keystroke
- """Get a single keystroke response to a prompt."""
+ """Get a single keystroke response to a prompt and returns it.
+
+ Args:
+ valid_responses: a list of strings that are considered to be
+ valid keystrokes to be accepted. If None, we accept
+ anything.
+ prompt: the prompt to print before watching keystrokes. If
+ None, skip this.
+ default_response: the response to return if the timeout
+ expires. If None, skip this.
+ timeout_seconds: number of seconds to wait before timing out
+ and returning the default_response. If None, wait forever.
+
+ Returns:
+ The keystroke the user pressed. If the user pressed a special
+ keystroke like ^C or ^Z, we raise a KeyboardInterrupt exception.
+ """
def _handle_timeout(signum, frame) -> None:
raise exceptions.TimeoutError()
def yn_response(prompt: str = None, *, timeout_seconds=None) -> Optional[str]:
- """Get a Y/N response to a prompt."""
-
+ """Get a Y/N response to a prompt.
+
+ Args:
+ prompt: the user prompt or None to skip this
+ timeout_seconds: the number of seconds to wait for a response or
+ None to wait forever.
+
+ Returns:
+ A lower case 'y' or 'n'. Or None if the timeout expires with
+ no input from the user. Or raises a KeyboardInterrupt if the
+ user pressed a special key such as ^C or ^Z.
+ """
yn = single_keystroke_response(
["y", "n", "Y", "N"], prompt=prompt, timeout_seconds=timeout_seconds
)
def up_down_enter() -> Optional[str]:
+ """Respond to UP, DOWN or ENTER events for simple menus without
+ the need for curses."""
+
os_special_keystrokes = [3, 26] # ^C, ^Z
while True:
key = readchar.readkey()
# © Copyright 2021-2022, Scott Gasch
-"""A simple compression helper for lowercase ascii text."""
+"""A simple toy compression helper for lowercase ascii text."""
import bitstring
"""The contents we'll write to each lock file."""
pid: int
+ """The pid of the process that holds the lock"""
+
commandline: str
+ """The commandline of the process that holds the lock"""
+
expiration_timestamp: Optional[float]
+ """When this lock will expire as seconds since Epoch"""
class LockFile(contextlib.AbstractContextManager):
"""A file locking mechanism that has context-manager support so you
- can use it in a with statement. e.g.
-
- with LockFile('./foo.lock'):
- # do a bunch of stuff... if the process dies we have a signal
- # handler to do cleanup. Other code (in this process or another)
- # that tries to take the same lockfile will block. There is also
- # some logic for detecting stale locks.
+ can use it in a with statement. e.g.::
+ with LockFile('./foo.lock'):
+ # do a bunch of stuff... if the process dies we have a signal
+ # handler to do cleanup. Other code (in this process or another)
+ # that tries to take the same lockfile will block. There is also
+ # some logic for detecting stale locks.
"""
def __init__(
expiration_timestamp: Optional[float] = None,
override_command: Optional[str] = None,
) -> None:
+ """C'tor.
+
+ Args:
+ lockfile_path: path of the lockfile to acquire
+ do_signal_cleanup: handle SIGINT and SIGTERM events by
+ releasing the lock before exiting
+ expiration_timestamp: when our lease on the lock should
+ expire (as seconds since the Epoch). None means the
+ lock will not expire until we explicltly release it.
+ override_command: don't use argv to determine our commandline
+ rather use this instead if provided.
+ """
self.is_locked: bool = False
self.lockfile: str = lockfile_path
self.locktime: Optional[int] = None
self.expiration_timestamp = expiration_timestamp
def locked(self):
+ """Is it locked currently?"""
return self.is_locked
def available(self):
+ """Is it available currently?"""
return not os.path.exists(self.lockfile)
def try_acquire_lock_once(self) -> bool:
+ """Attempt to acquire the lock with no blocking.
+
+ Returns:
+ True if the lock was acquired and False otherwise.
+ """
logger.debug("Trying to acquire %s.", self.lockfile)
try:
# Attempt to create the lockfile. These flags cause
backoff_factor: float = 2.0,
max_attempts=5,
) -> bool:
+ """Attempt to acquire the lock repeatedly with retries and backoffs.
+
+ Args:
+ initial_delay: how long to wait before retrying the first time
+ backoff_factor: a float >= 1.0 the multiples the current retry
+ delay each subsequent time we attempt to acquire and fail
+ to do so.
+ max_attempts: maximum number of times to try before giving up
+ and failing.
+
+ Returns:
+ True if the lock was acquired and False otherwise.
+ """
+
@decorator_utils.retry_if_false(
tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
)
return _try_acquire_lock_with_retries()
def release(self):
+ """Release the lock"""
try:
os.unlink(self.lockfile)
except Exception as e:
# © Copyright 2021-2022, Scott Gasch
-"""Utilities related to logging."""
+"""Utilities related to logging. To use it you must invoke
+:meth:`initialize_logging`. If you use the
+:meth:`bootstrap.initialize` decorator on your program's entry point,
+it will call this for you. See :meth:`python_modules.bootstrap.initialize`
+for more details. If you use this you get:
+
+* Ability to set logging level,
+* ability to define the logging format,
+* ability to tee all logging on stderr,
+* ability to tee all logging into a file,
+* ability to rotate said file as it grows,
+* ability to tee all logging into the system log (syslog) and
+ define the facility and level used to do so,
+* easy automatic pid/tid stamp on logging for debugging threads,
+* ability to squelch repeated log messages,
+* ability to log probabilistically in code,
+* ability to only see log messages from a particular module or
+ function,
+* ability to clear logging handlers added by earlier loaded modules.
+
+All of these are controlled via commandline arguments to your program,
+see the code below for details.
+"""
import collections
import contextlib
messages that it produces be squelched (ignored) after it logs the
same message more than N times.
- Note: this decorator affects *ALL* logging messages produced
- within the decorated function. That said, messages must be
- identical in order to be squelched. For example, if the same line
- of code produces different messages (because of, e.g., a format
- string), the messages are considered to be different.
+ .. note::
+
+ This decorator affects *ALL* logging messages produced
+ within the decorated function. That said, messages must be
+ identical in order to be squelched. For example, if the same line
+ of code produces different messages (because of, e.g., a format
+ string), the messages are considered to be different.
"""
class SquelchRepeatedMessagesFilter(logging.Filter):
- """
- A filter that only logs messages from a given site with the same
+ """A filter that only logs messages from a given site with the same
(exact) message at the same logging level N times and ignores
subsequent attempts to log.
- This filter only affects logging messages that repeat more than
- a threshold number of times from functions that are tagged with
- the @logging_utils.squelched_logging_ok decorator; others are
- ignored.
+ This filter only affects logging messages that repeat more than a
+ threshold number of times from functions that are tagged with the
+ @logging_utils.squelched_logging_ok decorator (see above); others
+ are ignored.
This functionality is enabled by default but can be disabled via
- the --no_logging_squelch_repeats commandline flag.
-
+ the :code:`--no_logging_squelch_repeats` commandline flag.
"""
def __init__(self) -> None:
class DynamicPerScopeLoggingLevelFilter(logging.Filter):
"""This filter only allows logging messages from an allow list of
- module names or module:function names. Blocks others.
-
+ module names or module:function names. Blocks all others.
"""
@staticmethod
@overrides
def filter(self, record: logging.LogRecord) -> bool:
+ """Decides whether or not to log based on an allow list."""
+
# First try to find a logging level by scope (--lmodule)
if len(self.level_by_scope) > 0:
min_level = None
def logging_is_probabilistic(probability_of_logging: float) -> Callable:
- """
- A decorator that indicates that all logging statements within the
+ """A decorator that indicates that all logging statements within the
scope of a particular (marked) function are not deterministic
(i.e. they do not always unconditionally log) but rather are
- probabilistic (i.e. they log N% of the time randomly).
-
- Note that this functionality can be disabled (forcing all logged
- messages to produce output) via the --no_logging_probabilistically
- cmdline argument.
+ probabilistic (i.e. they log N% of the time, randomly).
- This affects *ALL* logging statements within the marked function.
+ .. note::
+ This affects *ALL* logging statements within the marked function.
+ That this functionality can be disabled (forcing all logged
+ messages to produce output) via the
+ :code:`--no_logging_probabilistically` cmdline argument.
"""
def probabilistic_logging_wrapper(f: Callable):
This filter only affects logging messages from functions that have
been tagged with the @logging_utils.probabilistic_logging decorator.
-
"""
@overrides
class OnlyInfoFilter(logging.Filter):
- """
- A filter that only logs messages produced at the INFO logging
- level. This is used by the logging_info_is_print commandline
- option to select a subset of the logging stream to send to a
- stdout handler.
-
+ """A filter that only logs messages produced at the INFO logging
+ level. This is used by the ::code`--logging_info_is_print`
+ commandline option to select a subset of the logging stream to
+ send to a stdout handler.
"""
@overrides
"""
A formatter for adding milliseconds to log messages which, for
whatever reason, the default python logger doesn't do.
-
"""
converter = datetime.datetime.fromtimestamp # type: ignore
fmt,
facility_name,
):
+ """Some of the initial messages in the debug log are about how we
+ have set up logging itself."""
+
level_name = logging._levelToName.get(default_logging_level, str(default_logging_level))
logger.debug('Initialized global logging; default logging level is %s.', level_name)
if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0:
def initialize_logging(logger=None) -> logging.Logger:
+ """Initialize logging for the program. This must be called if you want
+ to use any of the functionality provided by this module such as:
+
+ * Ability to set logging level,
+ * ability to define the logging format,
+ * ability to tee all logging on stderr,
+ * ability to tee all logging into a file,
+ * ability to rotate said file as it grows,
+ * ability to tee all logging into the system log (syslog) and
+ define the facility and level used to do so,
+ * easy automatic pid/tid stamp on logging for debugging threads,
+ * ability to squelch repeated log messages,
+ * ability to log probabilistically in code,
+ * ability to only see log messages from a particular module or
+ function,
+ * ability to clear logging handlers added by earlier loaded modules.
+
+ All of these are controlled via commandline arguments to your program,
+ see the code below for details.
+
+ If you use the
+ :meth:`bootstrap.initialize` decorator on your program's entry point,
+ it will call this for you. See :meth:`python_modules.bootstrap.initialize`
+ for more details.
+ """
global LOGGING_INITIALIZED
if LOGGING_INITIALIZED:
return logging.getLogger()
def get_logger(name: str = ""):
+ """Get the global logger"""
logger = logging.getLogger(name)
return initialize_logging(logger)
"""Legacy function for printing a message augmented with thread id
still needed by some code. Please use --logging_debug_threads in
new code.
-
"""
if config.config['logging_debug_threads']:
from thread_utils import current_thread_id
"""Legacy function used to print to stderr still needed by some code.
Please just use normal logging with --logging_console which
accomplishes the same thing in new code.
-
"""
print(*args, file=sys.stderr, **kwargs)
class OutputMultiplexer(object):
- """
- A class that broadcasts printed messages to several sinks (including
- various logging levels, different files, different file handles,
- the house log, etc...). See also OutputMultiplexerContext for an
- easy usage pattern.
+ """A class that broadcasts printed messages to several sinks
+ (including various logging levels, different files, different file
+ handles, the house log, etc...). See also
+ :class:`OutputMultiplexerContext` for an easy usage pattern.
"""
class Destination(enum.IntEnum):
filenames: Optional[Iterable[str]] = None,
handles: Optional[Iterable[io.TextIOWrapper]] = None,
):
+ """
+ Constructs the OutputMultiplexer instance.
+
+ Args:
+ destination_bitv: a bitvector where each bit represents an
+ output destination. Multiple bits may be set.
+ logger: if LOG_* bits are set, you must pass a logger here.
+ filenames: if FILENAMES bit is set, this should be a list of
+ files you'd like to output into. This code handles opening
+ and closing said files.
+ handles: if FILEHANDLES bit is set, this should be a list of
+ already opened filehandles you'd like to output into. The
+ handles will remain open after the scope of the multiplexer.
+ """
if logger is None:
logger = logging.getLogger(None)
self.logger = logger
self.set_destination_bitv(destination_bitv)
def get_destination_bitv(self):
+ """Where are we outputting?"""
return self.destination_bitv
def set_destination_bitv(self, destination_bitv: int):
+ """Change the output destination_bitv to the one provided."""
if destination_bitv & self.Destination.FILENAMES and self.f is None:
raise ValueError("Filename argument is required if bitv & FILENAMES")
if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
self.destination_bitv = destination_bitv
def print(self, *args, **kwargs):
+ """Produce some output to all sinks."""
from string_utils import sprintf, strip_escape_sequences
end = kwargs.pop("end", None)
hlog(buf)
def close(self):
+ """Close all open files."""
if self.f is not None:
for _ in self.f:
_.close()
class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
"""
- A context that uses an OutputMultiplexer. e.g.::
+ A context that uses an :class:`OutputMultiplexer`. e.g.::
with OutputMultiplexerContext(
OutputMultiplexer.LOG_INFO |
"""Write a message to the house log (syslog facility local7 priority
info) by calling /usr/bin/logger. This is pretty hacky but used
by a bunch of code. Another way to do this would be to use
- --logging_syslog and --logging_syslog_facility but I can't
- actually say that's easier.
-
+ :code:`--logging_syslog` and :code:`--logging_syslog_facility` but
+ I can't actually say that's easier.
"""
message = message.replace("'", "'\"'\"'")
os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
# © Copyright 2021-2022, Scott Gasch
"""This is a module concerned with the creation of and searching of a
-corpus of documents. The corpus is held in memory for fast
-searching.
-
+corpus of documents. The corpus and index are held in memory.
"""
from __future__ import annotations
class Document:
"""A class representing a searchable document."""
- # A unique identifier for each document.
docid: str = ''
+ """A unique identifier for each document -- must be provided
+ by the caller. See :meth:`python_modules.id_generator.get` or
+ :meth:`python_modules.string_utils.generate_uuid` for potential
+ sources."""
- # A set of tag strings for this document. May be empty.
tags: Set[str] = field(default_factory=set)
+ """A set of tag strings for this document. May be empty. Tags
+ are simply text labels that are associated with a document and
+ may be used to search for it later.
+ """
- # A list of key->value strings for this document. May be empty.
properties: List[Tuple[str, str]] = field(default_factory=list)
+ """A list of key->value strings for this document. May be empty.
+ Properties are more flexible tags that have both a label and a
+ value. e.g. "category:mystery" or "author:smith"."""
- # An optional reference to something else; interpreted only by
- # caller code, ignored here.
reference: Optional[Any] = None
+ """An optional reference to something else for convenience;
+ interpreted only by caller code, ignored here.
+ """
class Operation(enum.Enum):
class Corpus(object):
- """A collection of searchable documents.
+ """A collection of searchable documents. The caller can
+ add documents to it (or edit existing docs) via :meth:`add_doc`,
+ retrieve a document given its docid via :meth:`get_doc`, and
+ perform various lookups of documents. The most interesting
+ lookup is implemented in :meth:`query`.
>>> c = Corpus()
>>> c.add_doc(Document(
distinct docid that will serve as its primary identifier. If
the same Document is added multiple times, only the most
recent addition is indexed. If two distinct documents with
- the same docid are added, the latter klobbers the former in the
- indexes.
+ the same docid are added, the latter klobbers the former in
+ the indexes. See :meth:`python_modules.id_generator.get` or
+ :meth:`python_modules.string_utils.generate_uuid` for potential
+ sources of docids.
Each Document may have an optional set of tags which can be
- used later in expressions to the query method.
+ used later in expressions to the query method. These are simple
+ text labels.
Each Document may have an optional list of key->value tuples
which can be used later in expressions to the query method.
never interpreted by this module. This is meant to allow easy
mapping between Documents in this corpus and external objects
they may represent.
+
+ Args:
+ doc: the document to add or edit
"""
if doc.docid in self.documents_by_docid:
self.docids_with_property[key].add(doc.docid)
def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
- """Return the set of docids that have a particular tag."""
+ """Return the set of docids that have a particular tag.
+
+ Args:
+ tag: the tag for which to search
+
+ Returns:
+ A set containing docids with the provided tag which
+ may be empty."""
return self.docids_by_tag[tag]
def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
- """Return the set of docids with a tag that contains a str"""
+ """Return the set of docids with a tag that contains a str.
+
+ Args:
+ tag: the tag pattern for which to search
+ Returns:
+ A set containing docids with tags that match the pattern
+ provided. e.g., if the arg was "foo" tags "football", "foobar",
+ and "food" all match.
+ """
ret = set()
for search_tag in self.docids_by_tag:
if tag in search_tag:
"""Return the set of docids that have a particular property no matter
what that property's value.
+ Args:
+ key: the key value to search for.
+
+ Returns:
+ A set of docids that contain the key (no matter what value)
+ which may be empty.
"""
return self.docids_with_property[key]
def get_docids_by_property(self, key: str, value: str) -> Set[str]:
"""Return the set of docids that have a particular property with a
- particular value..
+ particular value.
+ Args:
+ key: the key to search for
+ value: the value that key must have in order to match a doc.
+
+ Returns:
+ A set of docids that contain key with value which may be empty.
"""
return self.docids_by_property[(key, value)]
def invert_docid_set(self, original: Set[str]) -> Set[str]:
"""Invert a set of docids."""
-
return {docid for docid in self.documents_by_docid if docid not in original}
def get_doc(self, docid: str) -> Optional[Document]:
- """Given a docid, retrieve the previously added Document."""
+ """Given a docid, retrieve the previously added Document.
+ Args:
+ docid: the docid to retrieve
+
+ Returns:
+ The Document with docid or None to indicate no match.
+ """
return self.documents_by_docid.get(docid, None)
def query(self, query: str) -> Optional[Set[str]]:
"""Query the corpus for documents that match a logical expression.
- Returns a (potentially empty) set of docids for the matching
- (previously added) documents or None on error.
- e.g.
+ Args:
+ query: the logical query expressed using a simple language
+ that understands conjunction (and operator), disjunction
+ (or operator) and inversion (not operator) as well as
+ parenthesis. Here are some legal sample queries::
+
+ tag1 and tag2 and not tag3
- tag1 and tag2 and not tag3
+ (tag1 or tag2) and (tag3 or tag4)
- (tag1 or tag2) and (tag3 or tag4)
+ (tag1 and key2:value2) or (tag2 and key1:value1)
- (tag1 and key2:value2) or (tag2 and key1:value1)
+ key:*
- key:*
+ tag1 and key:*
- tag1 and key:*
+ Returns:
+ A (potentially empty) set of docids for the matching
+ (previously added) documents or None on error.
"""
try:
3
>>> pop.get_percentile(60)
7
-
"""
def __init__(self):
self.sorted_copy: Optional[List[float]] = None
def add_number(self, number: float):
- """O(2 log2 n)"""
+ """Adds a number to the population. Runtime complexity of this
+ operation is :math:`O(2 log_2 n)`"""
if not self.highers or number > self.highers[0]:
heappush(self.highers, number)
return self.aggregate / count
def get_mode(self) -> Tuple[float, int]:
- """Returns the mode (most common member)."""
+ """Returns the mode (most common member in the population)
+ in O(n) time."""
count: Dict[float, int] = collections.defaultdict(int)
for n in self.lowers:
def get_percentile(self, n: float) -> float:
"""Returns the number at approximately pn% (i.e. the nth percentile)
- of the distribution in O(n log n) time (expensive, requires a
- complete sort). Not thread safe. Caching does across
- multiple calls without an invocation to add_number.
-
+ of the distribution in O(n log n) time. Not thread-safe;
+ does caching across multiple calls without an invocation to
+ add_number for perf reasons.
"""
if n == 50:
return self.get_median()
def gcd_floats(a: float, b: float) -> float:
+ """Returns the greatest common divisor of a and b."""
if a < b:
return gcd_floats(b, a)
def gcd_float_sequence(lst: List[float]) -> float:
+ """Returns the greatest common divisor of a list of floats."""
if len(lst) <= 0:
raise ValueError("Need at least one number")
elif len(lst) == 1:
def truncate_float(n: float, decimals: int = 2):
- """
- Truncate a float to a particular number of decimals.
+ """Truncate a float to a particular number of decimals.
>>> truncate_float(3.1415927, 3)
3.141
1.45
>>> percentage_to_multiplier(-25)
0.75
-
"""
multiplier = percent / 100
multiplier += 1.0
0.0
>>> multiplier_to_percent(1.99)
99.0
-
"""
percent = multiplier
if percent > 0.0:
False
>>> is_prime(51602981)
True
-
"""
if not isinstance(n, int):
raise TypeError("argument passed to is_prime is not of 'int' type")
def make_orb(color: str) -> None:
+ """Make the orb on my desk a particular color."""
user_machine = config.config['orb_utils_user_machine']
orbfile_path = config.config['orb_utils_file_location']
os.system(f"ssh {user_machine} 'echo \"{color}\" > {orbfile_path}'")
def parallelize(
_funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
) -> typing.Callable:
- """Usage::
+ """This is a decorator that was created to make multi-threading,
+ multi-processing and remote machine parallelism simple in python.
+
+ Sample usage::
@parallelize # defaults to thread-mode
def my_function(a, b, c) -> int:
Method.REMOTE: a process on a remote host
The wrapped function returns immediately with a value that is
- wrapped in a SmartFuture. This value will block if it is either
- read directly (via a call to result._resolve) or indirectly (by
- using the result in an expression, printing it, hashing it,
- passing it a function argument, etc...). See comments on the
- SmartFuture class for details.
-
- Note: you may stack @parallelized methods and it will "work".
- That said, having multiple layers of Method.PROCESS or
- Method.REMOTE may prove to be problematic because each process in
- the stack will use its own independent pool which may overload
- your machine with processes or your network with remote processes
- beyond the control mechanisms built into one instance of the pool.
- Be careful.
-
- Also note: there is a non trivial overhead of pickling code and
- scp'ing it over the network when you use Method.REMOTE. There's
- a smaller but still considerable cost of creating a new process
- and passing code to/from it when you use Method.PROCESS.
+ wrapped in a :class:`SmartFuture`. This value will block if it is
+ either read directly (via a call to :meth:`_resolve`) or indirectly
+ (by using the result in an expression, printing it, hashing it,
+ passing it a function argument, etc...). See comments on
+ :class:`SmartFuture` for details.
+
+ .. warning::
+ You may stack @parallelized methods and it will "work".
+ That said, having multiple layers of :code:`Method.PROCESS` or
+ :code:`Method.REMOTE` will prove to be problematic because each process in
+ the stack will use its own independent pool which may overload
+ your machine with processes or your network with remote processes
+ beyond the control mechanisms built into one instance of the pool.
+ Be careful.
+
+ .. note::
+ There is non-trivial overhead of pickling code and
+ copying it over the network when you use :code:`Method.REMOTE`. There's
+ a smaller but still considerable cost of creating a new process
+ and passing code to/from it when you use :code:`Method.PROCESS`.
"""
def wrapper(funct: typing.Callable):
# © Copyright 2021-2022, Scott Gasch
-"""A Persistent is just a class with a load and save method. This
-module defines the Persistent base and a decorator that can be used to
+"""A :class:Persistent is just a class with a load and save method. This
+module defines the :class:Persistent base and a decorator that can be used to
create a persistent singleton that autoloads and autosaves."""
import atexit
class Persistent(ABC):
"""
A base class of an object with a load/save method. Classes that are
- decorated with @persistent_autoloaded_singleton should subclass this
- and implement their save() and load() methods.
-
+ decorated with :code:`@persistent_autoloaded_singleton` should subclass
+ this and implement their :meth:`save` and :meth:`load` methods.
"""
@abstractmethod
def save(self) -> bool:
"""
Save this thing somewhere that you'll remember when someone calls
- load() later on in a way that makes sense to your code.
+ :meth:`load` later on in a way that makes sense to your code.
"""
pass
@classmethod
@abstractmethod
def load(cls) -> Any:
- """
- Load this thing from somewhere and give back an instance which
- will become the global singleton and which will may (see
- below) be save()d at program exit time.
+ """Load this thing from somewhere and give back an instance which
+ will become the global singleton and which may (see
+ below) be saved (via :meth:`save`) at program exit time.
- Oh, in case this is handy, here's how to write a factory
- method that doesn't call the c'tor in python::
+ Oh, in case this is handy, here's a reminder how to write a
+ factory method that doesn't call the c'tor in python::
@classmethod
def load_from_somewhere(cls, somewhere):
def was_file_written_today(filename: str) -> bool:
- """Returns True if filename was written today.
+ """Convenience wrapper around was_file_written_within_n_seconds.
+
+ Args:
+ filename: filename to check
+
+ Returns:
+ True if filename was written today.
>>> import os
>>> filename = f'/tmp/testing_persistent_py_{os.getpid()}'
filename: str,
limit_seconds: int,
) -> bool:
- """Returns True if filename was written within the pas limit_seconds
- seconds.
+ """Helper for determining persisted state staleness.
+
+ Args:
+ filename: the filename to check
+ limit_seconds: how fresh, in seconds, it must be
+
+ Returns:
+ True if filename was written within the past limit_seconds
+ or False otherwise (or on error).
>>> import os
>>> filename = f'/tmp/testing_persistent_py_{os.getpid()}'
class PersistAtShutdown(enum.Enum):
"""
An enum to describe the conditions under which state is persisted
- to disk. See details below.
+ to disk. This is passed as an argument to the decorator below and
+ is used to indicate when to call :meth:save on a :class:Persistent
+ subclass.
+
+ * NEVER: never call :meth:save
+ * IF_NOT_LOADED: call :meth:save as long as we did not successfully
+ :meth:load its state.
+ * ALWAYS: always call :meth:save
"""
NEVER = (0,)
class persistent_autoloaded_singleton(object):
- """A decorator that can be applied to a Persistent subclass (i.e. a
- class with a save() and load() method. It will intercept attempts
- to instantiate the class via it's c'tor and, instead, invoke the
- class' load() method to give it a chance to read state from
- somewhere persistent.
-
- If load() fails (returns None), the c'tor is invoked with the
+ """A decorator that can be applied to a :class:Persistent subclass
+ (i.e. a class with :meth:save and :meth:load methods. The
+ decorator will intercept attempts to instantiate the class via
+ it's c'tor and, instead, invoke the class' :meth:load to give it a
+ chance to read state from somewhere persistent (disk, db,
+ whatever). Subsequent calls to construt instances of the wrapped
+ class will return a single, global instance (i.e. the wrapped
+ class is a singleton).
+
+ If :meth:load fails (returns None), the c'tor is invoked with the
original args as a fallback.
- Based upon the value of the optional argument persist_at_shutdown,
- (NEVER, IF_NOT_LOADED, ALWAYS), the save() method of the class will
- be invoked just before program shutdown to give the class a chance
- to save its state somewhere.
+ Based upon the value of the optional argument
+ :code:`persist_at_shutdown` argument, (NEVER, IF_NOT_LOADED,
+ ALWAYS), the :meth:save method of the class will be invoked just
+ before program shutdown to give the class a chance to save its
+ state somewhere.
+
+ .. note::
+ The implementations of :meth:save and :meth:load and where the
+ class persists its state are details left to the :class:Persistent
+ implementation. Essentially this decorator just handles the
+ plumbing of calling your save/load and appropriate times and
+ creates a transparent global singleton whose state can be
+ persisted between runs.
- The implementations of save() and load() and where the class
- persists its state are details left to the Persistent
- implementation.
"""
def __init__(
# © Copyright 2021-2022, Scott Gasch
-"""A helper to identify and optionally obscure some bad words."""
+"""A helper to identify and optionally obscure some bad words. Not
+perfect but decent. Uses a fuzzy block list rather than ML."""
import logging
import random
>>> _normalize('fucking a whore')
'fuck a whore'
+ >>> _normalize('pu55y')
+ 'pussy'
+
"""
result = text.lower()
result = result.replace("_", " ")
@staticmethod
def tokenize(text: str):
+ """Tokenize text into word-like chunks"""
for x in nltk.word_tokenize(text):
for y in re.split(r'\W+', x):
yield y
return False
def is_bad_word(self, word: str) -> bool:
+ """True if we think word is a bad word."""
return word in self.bad_words or self._normalize(word) in self.bad_words
def obscure_bad_words(self, text: str) -> str:
"""Obscure bad words that are detected by inserting random punctuation
characters.
-
"""
def obscure(word: str):
"""A simple utility to unpickle some code, run it, and pickle the
results.
-
"""
import logging
# © Copyright 2021-2022, Scott Gasch
-"""
-A future that can be treated as a substutute for the result that it
-contains and will not block until it is used. At that point, if the
-underlying value is not yet available yet, it will block until the
-internal result actually becomes available.
-
+"""A :class:Future that can be treated as a substutute for the result
+that it contains and will not block until it is used. At that point,
+if the underlying value is not yet available yet, it will block until
+the internal result actually becomes available.
"""
from __future__ import annotations
# © Copyright 2021-2022, Scott Gasch
"""Several helpers to keep track of internal state via periodic
-polling. StateTracker expects to be invoked periodically to maintain
-state whereas the others automatically update themselves and,
-optionally, expose an event for client code to wait on state changes.
+polling. :class:StateTracker expects to be invoked periodically to
+maintain state whereas the others (:class:AutomaticStateTracker and
+:class:WaitableAutomaticStateTracker) automatically update themselves
+and, optionally, expose an event for client code to wait on state
+changes.
"""
import datetime
update types (unique update_ids) and the periodicity(ies), in
seconds, at which it/they should be invoked.
- Note that, when more than one update is overdue, they will be
- invoked in order by their update_ids so care in choosing these
- identifiers may be in order.
+ .. note::
+ When more than one update is overdue, they will be
+ invoked in order by their update_ids so care in choosing these
+ identifiers may be in order.
+
+ Args:
+ update_ids_to_update_secs: a dict mapping a user-defined
+ update_id into a period (number of seconds) with which
+ we would like this update performed. e.g.::
+
+ update_ids_to_update_secs = {
+ 'refresh_local_state': 10.0,
+ 'refresh_remote_state': 60.0,
+ }
+
+ This would indicate that every 10s we would like to
+ refresh local state whereas every 60s we'd like to
+ refresh remote state.
"""
self.update_ids_to_update_secs = update_ids_to_update_secs
self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
now: datetime.datetime,
last_invocation: Optional[datetime.datetime],
) -> None:
- """Put whatever you want here. The update_id will be the string
- passed to the c'tor as a key in the Dict. It will only be
- tapped on the shoulder, at most, every update_secs seconds.
- The now param is the approximate current timestamp and the
- last_invocation param is the last time you were invoked (or
- None on the first invocation)
+ """Put whatever you want here to perform your state updates.
+
+ Args:
+ update_id: the string you passed to the c'tor as a key in
+ the update_ids_to_update_secs dict. :meth:update will
+ only be invoked on the shoulder, at most, every update_secs
+ seconds.
+
+ now: the approximate current timestamp at invocation time.
+
+ last_invocation: the last time this operation was invoked
+ (or None on the first invocation).
"""
pass
def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
"""Invoke this method to cause the StateTracker instance to identify
and invoke any overdue updates based on the schedule passed to
- the c'tor. In the base StateTracker class, this method must
- be invoked manually with a thread from external code.
+ the c'tor. In the base :class:StateTracker class, this method must
+ be invoked manually by a thread from external code. Other subclasses
+ are available that create their own updater threads (see below).
If more than one type of update (update_id) are overdue,
they will be invoked in order based on their update_ids.
class AutomaticStateTracker(StateTracker):
- """Just like HeartbeatCurrentState but you don't need to pump the
- heartbeat; it runs on a background thread. Call .shutdown() to
- terminate the updates.
+ """Just like :class:StateTracker but you don't need to pump the
+ :meth:heartbeat method periodically because we create a background
+ thread that manages periodic calling. You must call :meth:shutdown,
+ though, in order to terminate the update thread.
"""
@background_thread
def pace_maker(self, should_terminate: threading.Event) -> None:
- """Entry point for a background thread to own calling heartbeat()
- at regular intervals so that the main thread doesn't need to do
- so.
+ """Entry point for a background thread to own calling :meth:heartbeat
+ at regular intervals so that the main thread doesn't need to
+ do so.
"""
while True:
if should_terminate.is_set():
*,
override_sleep_delay: Optional[float] = None,
) -> None:
+ """Construct an AutomaticStateTracker.
+
+ Args:
+ update_ids_to_update_secs: a dict mapping a user-defined
+ update_id into a period (number of seconds) with which
+ we would like this update performed. e.g.::
+
+ update_ids_to_update_secs = {
+ 'refresh_local_state': 10.0,
+ 'refresh_remote_state': 60.0,
+ }
+
+ This would indicate that every 10s we would like to
+ refresh local state whereas every 60s we'd like to
+ refresh remote state.
+
+ override_sleep_delay: By default, this class determines
+ how long the background thread should sleep between
+ automatic invocations to :meth:heartbeat based on the
+ period of each update type in update_ids_to_update_secs.
+ If this argument is non-None, it overrides this computation
+ and uses this period as the sleep in the background thread.
+ """
import math_utils
super().__init__(update_ids_to_update_secs)
"""Terminates the background thread and waits for it to tear down.
This may block for as long as self.sleep_delay.
"""
-
logger.debug('Setting shutdown event and waiting for background thread.')
self.should_terminate.set()
self.updater_thread.join()
*,
override_sleep_delay: Optional[float] = None,
) -> None:
+ """Construct an WaitableAutomaticStateTracker.
+
+ Args:
+ update_ids_to_update_secs: a dict mapping a user-defined
+ update_id into a period (number of seconds) with which
+ we would like this update performed. e.g.::
+
+ update_ids_to_update_secs = {
+ 'refresh_local_state': 10.0,
+ 'refresh_remote_state': 60.0,
+ }
+
+ This would indicate that every 10s we would like to
+ refresh local state whereas every 60s we'd like to
+ refresh remote state.
+
+ override_sleep_delay: By default, this class determines
+ how long the background thread should sleep between
+ automatic invocations to :meth:heartbeat based on the
+ period of each update type in update_ids_to_update_secs.
+ If this argument is non-None, it overrides this computation
+ and uses this period as the sleep in the background thread.
+ """
self._something_changed = threading.Event()
super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
def something_changed(self):
+ """Indicate that something has changed."""
self._something_changed.set()
def did_something_change(self) -> bool:
+ """Indicate whether some state has changed in the background."""
return self._something_changed.is_set()
def reset(self):
+ """Call to clear the 'something changed' bit. See usage above."""
self._something_changed.clear()
def wait(self, *, timeout=None):
+ """Wait for something to change or a timeout to lapse.
+
+ Args:
+ timeout: maximum amount of time to wait. If None, wait
+ forever (until something changes).
+ """
return self._something_changed.wait(timeout=timeout)
def is_none_or_empty(in_str: Optional[str]) -> bool:
"""
- Returns true if the input string is either None or an empty string.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the input string is either None or an empty string,
+ False otherwise.
>>> is_none_or_empty("")
True
def is_string(obj: Any) -> bool:
"""
- Checks if an object is a string.
+ Args:
+ in_str: the object to test
+
+ Returns:
+ True if the object is a string and False otherwise.
>>> is_string('test')
True
def is_empty_string(in_str: Any) -> bool:
+ """
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string is empty and False otherwise.
+ """
return is_empty(in_str)
def is_empty(in_str: Any) -> bool:
"""
- Checks if input is a string and empty or only whitespace.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string is empty and false otherwise.
>>> is_empty('')
True
def is_full_string(in_str: Any) -> bool:
"""
- Checks that input is a string and is not empty ('') or only whitespace.
+ Args:
+ in_str: the object to test
+
+ Returns:
+ True if the object is a string and is not empty ('') and
+ is not only composed of whitespace.
>>> is_full_string('test!')
True
def is_number(in_str: str) -> bool:
"""
- Checks if a string is a valid number.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string contains a valid numberic value and
+ False otherwise.
>>> is_number(100.5)
Traceback (most recent call last):
def is_integer_number(in_str: str) -> bool:
"""
- Checks whether the given string represents an integer or not.
+ Args:
+ in_str: the string to test
- An integer may be signed or unsigned or use a "scientific notation".
+ Returns:
+ True if the string contains a valid (signed or unsigned,
+ decimal, hex, or octal, regular or scientific) integral
+ expression and False otherwise.
>>> is_integer_number('42')
True
def is_hexidecimal_integer_number(in_str: str) -> bool:
"""
- Checks whether a string is a hex integer number.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string is a hex integer number and False otherwise.
>>> is_hexidecimal_integer_number('0x12345')
True
def is_octal_integer_number(in_str: str) -> bool:
"""
- Checks whether a string is an octal number.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string is a valid octal integral number and False otherwise.
>>> is_octal_integer_number('0o777')
True
def is_binary_integer_number(in_str: str) -> bool:
"""
- Returns whether a string contains a binary number.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the string contains a binary integral number and False otherwise.
>>> is_binary_integer_number('0b10111')
True
def to_int(in_str: str) -> int:
- """Returns the integral value of the string or raises on error.
+ """
+ Args:
+ in_str: the string to convert
+
+ Returns:
+ The integral value of the string or raises on error.
>>> to_int('1234')
1234
def is_decimal_number(in_str: str) -> bool:
"""
- Checks whether the given string represents a decimal or not.
+ Args:
+ in_str: the string to check
+
+ Returns:
+ True if the given string represents a decimal or False
+ otherwise. A decimal may be signed or unsigned or use
+ a "scientific notation".
- A decimal may be signed or unsigned or use a "scientific notation".
+ .. note::
+ We do not consider integers without a decimal point
+ to be decimals; they return False (see example).
>>> is_decimal_number('42.0')
True
def strip_escape_sequences(in_str: str) -> str:
"""
- Remove escape sequences in the input string.
+ Args:
+ in_str: the string to strip of escape sequences.
+
+ Returns:
+ in_str with escape sequences removed.
+
+ .. note::
+ What is considered to be an "escape sequence" is defined
+ by a regular expression. While this gets common ones,
+ there may exist valid sequences that it doesn't match.
>>> strip_escape_sequences('\e[12;11;22mthis is a test!')
'this is a test!'
def add_thousands_separator(in_str: str, *, separator_char=',', places=3) -> str:
"""
- Add thousands separator to a numeric string. Also handles numbers.
+ Args:
+ in_str: string or number to which to add thousands separator(s)
+ separator_char: the separator character to add (defaults to comma)
+ places: add a separator every N places (defaults to three)
+
+ Returns:
+ A numeric string with thousands separators added appropriately.
>>> add_thousands_separator('12345678')
'12,345,678'
return ret
-# Full url example:
-# scheme://username:
[email protected]:8042/folder/subfolder/file.extension?param=value¶m2=value2#hash
def is_url(in_str: Any, allowed_schemes: Optional[List[str]] = None) -> bool:
"""
- Check if a string is a valid url.
+ Args:
+ in_str: the string to test
+ allowed_schemes: an optional list of allowed schemes (e.g.
+ ['http', 'https', 'ftp']. If passed, only URLs that
+ begin with the one of the schemes passed will be considered
+ to be valid. Otherwise, any scheme:// will be considered
+ valid.
+
+ Returns:
+ True if in_str contains a valid URL and False otherwise.
>>> is_url('http://www.mysite.com')
True
True
>>> is_url('.mysite.com')
False
+ >>> is_url('scheme://username:
[email protected]:8042/folder/subfolder/file.extension?param=value¶m2=value2#hash')
+ True
"""
if not is_full_string(in_str):
return False
def is_email(in_str: Any) -> bool:
"""
- Check if a string is a valid email.
+ Args:
+ in_str: the email address to check
- Reference: https://tools.ietf.org/html/rfc3696#section-3
+ Returns: True if the in_str contains a valid email (as defined by
+ https://tools.ietf.org/html/rfc3696#section-3) or False
+ otherwise.
True
def suffix_string_to_number(in_str: str) -> Optional[int]:
- """Take a string like "33Gb" and convert it into a number (of bytes)
- like 34603008. Return None if the input string is not valid.
+ """Takes a string like "33Gb" and converts it into a number (of bytes)
+ like 34603008.
+
+ Args:
+ in_str: the string with a suffix to be interpreted and removed.
+
+ Returns:
+ An integer number of bytes or None to indicate an error.
>>> suffix_string_to_number('1Mb')
1048576
def number_to_suffix_string(num: int) -> Optional[str]:
"""Take a number (of bytes) and returns a string like "43.8Gb".
- Returns none if the input is invalid.
+
+ Args:
+ num: an integer number of bytes
+
+ Returns:
+ A string with a suffix representing num bytes concisely or
+ None to indicate an error.
>>> number_to_suffix_string(14066017894)
'13.1Gb'
>>> number_to_suffix_string(1024 * 1024)
'1.0Mb'
-
"""
d = 0.0
suffix = None
def is_credit_card(in_str: Any, card_type: str = None) -> bool:
"""
- Checks if a string is a valid credit card number.
- If card type is provided then it checks against that specific type only,
- otherwise any known credit card number will be accepted.
+ Args:
+ in_str: a string to check
+ card_type: if provided, contains the card type to validate
+ with. Otherwise, all known credit card number types will
+ be accepted.
- Supported card types are the following:
+ Supported card types are the following:
- - VISA
- - MASTERCARD
- - AMERICAN_EXPRESS
- - DINERS_CLUB
- - DISCOVER
- - JCB
+ * VISA
+ * MASTERCARD
+ * AMERICAN_EXPRESS
+ * DINERS_CLUB
+ * DISCOVER
+ * JCB
+
+ Returns:
+ True if in_str is a valid credit card number.
"""
if not is_full_string(in_str):
return False
def is_camel_case(in_str: Any) -> bool:
"""
- Checks if a string is formatted as camel case.
+ Args:
+ in_str: the string to test
- A string is considered camel case when:
+ Returns:
+ True if the string is formatted as camel case and False otherwise.
+ A string is considered camel case when:
- - it's composed only by letters ([a-zA-Z]) and optionally numbers ([0-9])
- - it contains both lowercase and uppercase letters
- - it does not start with a number
+ * it's composed only by letters ([a-zA-Z]) and optionally numbers ([0-9])
+ * it contains both lowercase and uppercase letters
+ * it does not start with a number
"""
return is_full_string(in_str) and CAMEL_CASE_TEST_RE.match(in_str) is not None
def is_snake_case(in_str: Any, *, separator: str = "_") -> bool:
"""
- Checks if a string is formatted as "snake case".
+ Args:
+ in_str: the string to test
- A string is considered snake case when:
+ Returns: True if the string is snake case and False otherwise. A
+ string is considered snake case when:
- - it's composed only by lowercase/uppercase letters and digits
- - it contains at least one underscore (or provided separator)
- - it does not start with a number
+ * it's composed only by lowercase/uppercase letters and digits
+ * it contains at least one underscore (or provided separator)
+ * it does not start with a number
>>> is_snake_case('this_is_a_test')
True
False
>>> is_snake_case('this-is-a-test', separator='-')
True
-
"""
if is_full_string(in_str):
re_map = {"_": SNAKE_CASE_TEST_RE, "-": SNAKE_CASE_TEST_DASH_RE}
def is_json(in_str: Any) -> bool:
"""
- Check if a string is a valid json.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the in_str contains valid JSON and False otherwise.
>>> is_json('{"name": "Peter"}')
True
def is_uuid(in_str: Any, allow_hex: bool = False) -> bool:
"""
- Check if a string is a valid UUID.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if the in_str contains a valid UUID and False otherwise.
>>> is_uuid('6f8aa2f9-686c-4ac3-8766-5712354a04cf')
True
def is_ip_v4(in_str: Any) -> bool:
"""
- Checks if a string is a valid ip v4.
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if in_str contains a valid IPv4 address and False otherwise.
>>> is_ip_v4('255.200.100.75')
True
def extract_ip_v4(in_str: Any) -> Optional[str]:
"""
- Extracts the IPv4 chunk of a string or None.
+ Args:
+ in_str: the string to extract an IPv4 address from.
+
+ Returns:
+ The first extracted IPv4 address from in_str or None if
+ none were found or an error occurred.
>>> extract_ip_v4(' The secret IP address: 127.0.0.1 (use it wisely) ')
'127.0.0.1'
def is_ip_v6(in_str: Any) -> bool:
"""
- Checks if a string is a valid ip v6.
+ Args:
+ in_str: the string to test.
+
+ Returns:
+ True if in_str contains a valid IPv6 address and False otherwise.
>>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:7334')
True
def extract_ip_v6(in_str: Any) -> Optional[str]:
"""
- Extract IPv6 chunk or None.
+ Args:
+ in_str: the string from which to extract an IPv6 address.
+
+ Returns:
+ The first IPv6 address found in in_str or None if no address
+ was found or an error occurred.
>>> extract_ip_v6('IP: 2001:db8:85a3:0000:0000:8a2e:370:7334')
'2001:db8:85a3:0000:0000:8a2e:370:7334'
def is_ip(in_str: Any) -> bool:
"""
- Checks if a string is a valid ip (either v4 or v6).
+ Args:
+ in_str: the string to test.
+
+ Returns:
+ True if in_str contains a valid IP address (either IPv4 or
+ IPv6).
>>> is_ip('255.200.100.75')
True
def extract_ip(in_str: Any) -> Optional[str]:
"""
- Extract the IP address or None.
+ Args:
+ in_str: the string from which to extract in IP address.
+
+ Returns:
+ The first IP address (IPv4 or IPv6) found in in_str or
+ None to indicate none found or an error condition.
>>> extract_ip('Attacker: 255.200.100.75')
'255.200.100.75'
>>> extract_ip('Remote host: 2001:db8:85a3:0000:0000:8a2e:370:7334')
'2001:db8:85a3:0000:0000:8a2e:370:7334'
>>> extract_ip('1.2.3')
-
"""
ip = extract_ip_v4(in_str)
if ip is None:
def is_mac_address(in_str: Any) -> bool:
- """Return True if in_str is a valid MAC address false otherwise.
+ """
+ Args:
+ in_str: the string to test
+
+ Returns:
+ True if in_str is a valid MAC address False otherwise.
>>> is_mac_address("34:29:8F:12:0D:2F")
True
def extract_mac_address(in_str: Any, *, separator: str = ":") -> Optional[str]:
"""
- Extract the MAC address from in_str.
+ Args:
+ in_str: the string from which to extract a MAC address.
+
+ Returns:
+ The first MAC address found in in_str or None to indicate no
+ match or an error.
>>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F')
'34:29:8F:12:0D:2F'
>>> extract_mac_address('? (10.0.0.30) at d8:5d:e2:34:54:86 on em0 expires in 1176 seconds [ethernet]')
'd8:5d:e2:34:54:86'
-
"""
if not is_full_string(in_str):
return None
def is_slug(in_str: Any, separator: str = "-") -> bool:
"""
- Checks if a given string is a slug (as created by `slugify()`).
+ Args:
+ in_str: string to test
+
+ Returns:
+ True if in_str is a slug string and False otherwise.
>>> is_slug('my-blog-post-title')
True
>>> is_slug('My blog post title')
False
-
"""
if not is_full_string(in_str):
return False
def contains_html(in_str: str) -> bool:
"""
- Checks if the given string contains HTML/XML tags.
+ Args:
+ in_str: the string to check for tags in
+
+ Returns:
+ True if the given string contains HTML/XML tags and False
+ otherwise.
- By design, this function matches ANY type of tag, so don't expect to use it
- as an HTML validator, its goal is to detect "malicious" or undesired tags in the text.
+ .. warning::
+ By design, this function matches ANY type of tag, so don't expect
+ to use it as an HTML validator. It's a quick sanity check at
+ best. See something like BeautifulSoup for a more full-featuered
+ HTML parser.
>>> contains_html('my string is <strong>bold</strong>')
True
def words_count(in_str: str) -> int:
"""
- Returns the number of words contained into the given string.
+ Args:
+ in_str: the string to count words in
- This method is smart, it does consider only sequence of one or more letter and/or numbers
- as "words", so a string like this: "! @ # % ... []" will return zero!
- Moreover it is aware of punctuation, so the count for a string like "one,two,three.stop"
- will be 4 not 1 (even if there are no spaces in the string).
+ Returns:
+ The number of words contained in the given string.
+
+ .. note::
+
+ This method is "smart" in that it does consider only sequences
+ of one or more letter and/or numbers to be "words". Thus a
+ string like this: "! @ # % ... []" will return zero. Moreover
+ it is aware of punctuation, so the count for a string like
+ "one,two,three.stop" will be 4 not 1 (even if there are no spaces
+ in the string).
>>> words_count('hello world')
2
>>> words_count('one,two,three.stop')
4
-
"""
if not is_string(in_str):
raise ValueError(in_str)
def word_count(in_str: str) -> int:
+ """
+ Args:
+ in_str: the string to count words in
+
+ Returns:
+ The number of words contained in the given string.
+
+ .. note::
+
+ This method is "smart" in that it does consider only sequences
+ of one or more letter and/or numbers to be "words". Thus a
+ string like this: "! @ # % ... []" will return zero. Moreover
+ it is aware of punctuation, so the count for a string like
+ "one,two,three.stop" will be 4 not 1 (even if there are no spaces
+ in the string).
+
+ >>> word_count('hello world')
+ 2
+ >>> word_count('one,two,three.stop')
+ 4
+ """
return words_count(in_str)
def generate_uuid(omit_dashes: bool = False) -> str:
"""
- Generated an UUID string (using `uuid.uuid4()`).
+ Args:
+ omit_dashes: should we omit the dashes in the generated UUID?
+
+ Returns:
+ A generated UUID string (using `uuid.uuid4()`) with or without
+ dashes per the omit_dashes arg.
generate_uuid() # possible output: '97e3a716-6b33-4ab9-9bb1-8128cb24d76b'
generate_uuid(omit_dashes=True) # possible output: '97e3a7166b334ab99bb18128cb24d76b'
-
"""
uid = uuid4()
if omit_dashes:
def generate_random_alphanumeric_string(size: int) -> str:
"""
- Returns a string of the specified size containing random
- characters (uppercase/lowercase ascii letters and digits).
+ Args:
+ size: number of characters to generate
+
+ Returns:
+ A string of the specified size containing random characters
+ (uppercase/lowercase ascii letters and digits).
>>> random.seed(22)
>>> generate_random_alphanumeric_string(9)
'96ipbNClS'
-
"""
if size < 1:
raise ValueError("size must be >= 1")
def reverse(in_str: str) -> str:
"""
- Returns the string with its chars reversed.
+ Args:
+ in_str: the string to reverse
+
+ Returns:
+ The reversed (chracter by character) string.
>>> reverse('test')
'tset'
-
"""
if not is_string(in_str):
raise ValueError(in_str)
def camel_case_to_snake_case(in_str, *, separator="_"):
"""
- Convert a camel case string into a snake case one.
- (The original string is returned if is not a valid camel case string)
+ Args:
+ in_str: the camel case string to convert
+
+ Returns:
+ A snake case string equivalent to the camel case input or the
+ original string if it is not a valid camel case string or some
+ other error occurs.
>>> camel_case_to_snake_case('MacAddressExtractorFactory')
'mac_address_extractor_factory'
in_str: str, *, upper_case_first: bool = True, separator: str = "_"
) -> str:
"""
- Convert a snake case string into a camel case one.
- (The original string is returned if is not a valid snake case string)
+ Args:
+ in_str: the snake case string to convert
+
+ Returns:
+ A camel case string that is equivalent to the snake case string
+ provided or the original string back again if it is not valid
+ snake case or another error occurs.
>>> snake_case_to_camel_case('this_is_a_test')
'ThisIsATest'
def to_char_list(in_str: str) -> List[str]:
- """Convert a string into a list of chars.
+ """
+ Args:
+ in_str: the string to split into a char list
+
+ Returns:
+ A list of strings of length one each.
>>> to_char_list('test')
['t', 'e', 's', 't']
def from_char_list(in_list: List[str]) -> str:
- """Convert a char list into a string.
+ """
+ Args:
+ in_list: A list of characters to convert into a string.
+
+ Returns:
+ The string resulting from gluing the characters in in_list
+ together.
>>> from_char_list(['t', 'e', 's', 't'])
'test'
return "".join(in_list)
-def shuffle(in_str: str) -> str:
- """Return a new string containing same chars of the given one but in
- a randomized order.
+def shuffle(in_str: str) -> Optional[str]:
"""
- if not is_string(in_str):
- raise ValueError(in_str)
+ Args:
+ in_str: a string to shuffle randomly by character
+
+ Returns:
+ A new string containing same chars of the given one but in
+ a randomized order. Note that in rare cases this could result
+ in the same original string as no check is done. Returns
+ None to indicate error conditions.
- # turn the string into a list of chars
+ >>> random.seed(22)
+ >>> shuffle('awesome')
+ 'meosaew'
+ """
+ if not is_string(in_str):
+ return None
chars = to_char_list(in_str)
random.shuffle(chars)
return from_char_list(chars)
-def scramble(in_str: str) -> str:
+def scramble(in_str: str) -> Optional[str]:
+ """
+ Args:
+ in_str: a string to shuffle randomly by character
+
+ Returns:
+ A new string containing same chars of the given one but in
+ a randomized order. Note that in rare cases this could result
+ in the same original string as no check is done. Returns
+ None to indicate error conditions.
+
+ >>> random.seed(22)
+ >>> scramble('awesome')
+ 'meosaew'
+ """
return shuffle(in_str)
def strip_html(in_str: str, keep_tag_content: bool = False) -> str:
"""
- Remove html code contained into the given string.
+ Args:
+ in_str: the string to strip tags from
+ keep_tag_content: should we keep the inner contents of tags?
+
+ Returns:
+ A string with all HTML tags removed (optionally with tag contents
+ preserved).
+
+ .. note::
+ This method uses simple regular expressions to strip tags and is
+ not a full fledged HTML parser by any means. Consider using
+ something like BeautifulSoup if your needs are more than this
+ simple code can fulfill.
>>> strip_html('test: <a href="foo/bar">click here</a>')
'test: '
def asciify(in_str: str) -> str:
"""
- Force string content to be ascii-only by translating all non-ascii
- chars into the closest possible representation (eg: ó -> o, Ë ->
- E, ç -> c...).
+ Args:
+ in_str: the string to asciify.
+
+ Returns:
+ An output string roughly equivalent to the original string
+ where all content to are ascii-only. This is accomplished
+ by translating all non-ascii chars into their closest possible
+ ASCII representation (eg: ó -> o, Ë -> E, ç -> c...).
- N.B. Some chars may be lost if impossible to translate.
+ .. warning::
+ Some chars may be lost if impossible to translate.
>>> asciify('èéùúòóäåëýñÅÀÁÇÌÍÑÓË')
'eeuuooaaeynAAACIINOE'
def slugify(in_str: str, *, separator: str = "-") -> str:
"""
- Converts a string into a "slug" using provided separator.
- The returned string has the following properties:
+ Args:
+ in_str: the string to slugify
+ separator: the character to use during sligification (default
+ is a dash)
- - it has no spaces
- - all letters are in lower case
- - all punctuation signs and non alphanumeric chars are removed
- - words are divided using provided separator
- - all chars are encoded as ascii (by using `asciify()`)
- - is safe for URL
+ Returns:
+ The converted string. The returned string has the following properties:
+
+ * it has no spaces
+ * all letters are in lower case
+ * all punctuation signs and non alphanumeric chars are removed
+ * words are divided using provided separator
+ * all chars are encoded as ascii (by using :meth:`asciify`)
+ * is safe for URL
>>> slugify('Top 10 Reasons To Love Dogs!!!')
'top-10-reasons-to-love-dogs'
def to_bool(in_str: str) -> bool:
"""
- Turns a string into a boolean based on its content (CASE INSENSITIVE).
+ Args:
+ in_str: the string to convert to boolean
- A positive boolean (True) is returned if the string value is one
- of the following:
+ Returns:
+ A boolean equivalent of the original string based on its contents.
+ All conversion is case insensitive. A positive boolean (True) is
+ returned if the string value is any of the following:
- - "true"
- - "1"
- - "yes"
- - "y"
+ * "true"
+ * "t"
+ * "1"
+ * "yes"
+ * "y"
+ * "on"
- Otherwise False is returned.
+ Otherwise False is returned.
>>> to_bool('True')
True
>>> to_bool('on')
True
-
"""
if not is_string(in_str):
raise ValueError(in_str)
def to_date(in_str: str) -> Optional[datetime.date]:
"""
- Parses a date string. See DateParser docs for details.
+ Args:
+ in_str: the string to convert into a date
+
+ Returns:
+ The datetime.date the string contained or None to indicate
+ an error. This parser is relatively clever; see
+ :class:`python_modules.dateparse.dateparse_utils` docs for
+ details.
+
+ >>> to_date('9/11/2001')
+ datetime.date(2001, 9, 11)
+ >>> to_date('xyzzy')
"""
import dateparse.dateparse_utils as du
return None
-def valid_date(in_str: str) -> bool:
+def is_valid_date(in_str: str) -> bool:
"""
- True if the string represents a valid date.
+ Args:
+ in_str: the string to check
+
+ Returns:
+ True if the string represents a valid date that we can recognize
+ and False otherwise. This parser is relatively clever; see
+ :class:`python_modules.dateparse.dateparse_utils` docs for
+ details.
+
+ >>> is_valid_date('1/2/2022')
+ True
+ >>> is_valid_date('christmas')
+ True
+ >>> is_valid_date('next wednesday')
+ True
+ >>> is_valid_date('xyzzy')
+ False
"""
import dateparse.dateparse_utils as dp
def to_datetime(in_str: str) -> Optional[datetime.datetime]:
"""
- Parses a datetime string. See DateParser docs for more info.
+ Args:
+ in_str: string to parse into a datetime
+
+ Returns:
+ A python datetime parsed from in_str or None to indicate
+ an error. This parser is relatively clever; see
+ :class:`python_modules.dateparse.dateparse_utils` docs for
+ details.
+
+ >>> to_datetime('7/20/1969 02:56 GMT')
+ datetime.datetime(1969, 7, 20, 2, 56, tzinfo=<StaticTzInfo 'GMT'>)
"""
import dateparse.dateparse_utils as dp
dt = d.parse(in_str)
if isinstance(dt, datetime.datetime):
return dt
- except ValueError:
+ except Exception:
msg = f'Unable to parse datetime {in_str}.'
logger.warning(msg)
return None
def valid_datetime(in_str: str) -> bool:
"""
- True if the string represents a valid datetime.
+ Args:
+ in_str: the string to check
+
+ Returns:
+ True if in_str contains a valid datetime and False otherwise.
+ This parser is relatively clever; see
+ :class:`python_modules.dateparse.dateparse_utils` docs for
+ details.
+
+ >>> valid_datetime('next wednesday at noon')
+ True
+ >>> valid_datetime('3 weeks ago at midnight')
+ True
+ >>> valid_datetime('next easter at 5:00 am')
+ True
+ >>> valid_datetime('sometime soon')
+ False
"""
_ = to_datetime(in_str)
if _ is not None:
def squeeze(in_str: str, character_to_squeeze: str = ' ') -> str:
"""
- Squeeze runs of more than one character_to_squeeze into one.
+ Args:
+ in_str: the string to squeeze
+ character_to_squeeze: the character to remove runs of
+ more than one in a row (default = space)
+
+ Returns: A "squeezed string" where runs of more than one
+ character_to_squeeze into one.
>>> squeeze(' this is a test ')
' this is a test '
)
-def dedent(in_str: str) -> str:
+def dedent(in_str: str) -> Optional[str]:
"""
- Removes tab indentation from multi line strings (inspired by analogous Scala function).
+ Args:
+ in_str: the string to dedent
+
+ Returns:
+ A string with tab indentation removed or None on error.
+
+ .. note::
+
+ Inspired by analogous Scala function.
+
+ >>> dedent('\t\ttest\\n\t\ting')
+ 'test\\ning'
"""
if not is_string(in_str):
- raise ValueError(in_str)
+ return None
line_separator = '\n'
lines = [MARGIN_RE.sub('', line) for line in in_str.split(line_separator)]
return line_separator.join(lines)
def indent(in_str: str, amount: int) -> str:
"""
- Indents string by prepending amount spaces.
+ Args:
+ in_str: the string to indent
+ amount: count of spaces to indent each line by
+
+ Returns:
+ An indented string created by prepending amount spaces.
>>> indent('This is a test', 4)
' This is a test'
-
"""
if not is_string(in_str):
raise ValueError(in_str)
def sprintf(*args, **kwargs) -> str:
- """String printf, like in C"""
+ """
+ Args:
+ This function uses the same syntax as the builtin print
+ function.
+
+ Returns:
+ An interpolated string capturing print output, like man(3)
+ :code:sprintf.
+ """
ret = ""
sep = kwargs.pop("sep", None)
def strip_ansi_sequences(in_str: str) -> str:
- """Strips ANSI sequences out of strings.
+ """
+ Args:
+ in_str: the string to strip
+
+ Returns:
+ in_str with recognized ANSI escape sequences removed.
+
+ .. warning::
+ This method works by using a regular expression.
+ It works for all ANSI escape sequences I've tested with but
+ may miss some; caveat emptor.
>>> import ansi as a
>>> s = a.fg('blue') + 'blue!' + a.reset()
return False
-def capitalize_first_letter(txt: str) -> str:
- """Capitalize the first letter of a string.
+def capitalize_first_letter(in_str: str) -> str:
+ """
+ Args:
+ in_str: the string to capitalize
+
+ Returns:
+ in_str with the first character capitalized.
>>> capitalize_first_letter('test')
'Test'
'ALREADY!'
"""
- return txt[0].upper() + txt[1:]
+ return in_str[0].upper() + in_str[1:]
def it_they(n: int) -> str:
- """It or they?
+ """
+ Args:
+ n: how many of them are there?
+
+ Returns:
+ 'it' if n is one or 'they' otherwize.
+
+ Suggested usage::
+
+ n = num_files_saved_to_tmp()
+ print(f'Saved file{pluralize(n)} successfully.')
+ print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> it_they(1)
'it'
>>> it_they(100)
'they'
-
"""
if n == 1:
return "it"
def is_are(n: int) -> str:
- """Is or are?
+ """
+ Args:
+ n: how many of them are there?
+
+ Returns:
+ 'is' if n is one or 'are' otherwize.
+
+ Suggested usage::
+
+ n = num_files_saved_to_tmp()
+ print(f'Saved file{pluralize(n)} successfully.')
+ print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> is_are(1)
'is'
def pluralize(n: int) -> str:
- """Add an s?
+ """
+ Args:
+ n: how many of them are there?
+
+ Returns:
+ 's' if n is greater than one otherwize ''.
+
+ Suggested usage::
+
+ n = num_files_saved_to_tmp()
+ print(f'Saved file{pluralize(n)} successfully.')
+ print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> pluralize(15)
's'
>>> count = 4
>>> print(f'There {is_are(count)} {count} file{pluralize(count)}.')
There are 4 files.
-
"""
if n == 1:
return ""
def make_contractions(txt: str) -> str:
- """Glue words together to form contractions.
+ """This code glues words in txt together to form (English)
+ contractions.
+
+ Args:
+ txt: the input text to be contractionized.
+
+ Returns:
+ Output text identical to original input except for any
+ recognized contractions are formed.
+
+ .. note::
+ The order in which we create contractions is defined by the
+ implementation and what I thought made more sense when writing
+ this code.
>>> make_contractions('It is nice today.')
"It's nice today."
>>> make_contractions('I said you can not go.')
"I said you can't go."
-
"""
first_second = [
def thify(n: int) -> str:
- """Return the proper cardinal suffix for a number.
+ """
+ Args:
+ n: how many of them are there?
+
+ Returns:
+ The proper cardinal suffix for a number.
+
+ Suggested usage::
+
+ attempt_count = 0
+ while True:
+ attempt_count += 1
+ if try_the_thing():
+ break
+ print(f'The {attempt_count}{thify(attempt_count)} failed, trying again.')
>>> thify(1)
'st'
'rd'
>>> thify(16)
'th'
-
"""
digit = str(n)
assert is_integer_number(digit)
def ngrams(txt: str, n: int):
- """Return the ngrams from a string.
+ """
+ Args:
+ txt: the string to create ngrams using
+ n: how many words per ngram created?
+
+ Returns:
+ Generates the ngrams from the input string.
>>> [x for x in ngrams('This is a test', 2)]
['This is', 'is a', 'a test']
-
"""
words = txt.split()
for ngram in ngrams_presplit(words, n):
def ngrams_presplit(words: Sequence[str], n: int):
+ """
+ Same as :meth:ngrams but with the string pre-split.
+ """
return list_utils.ngrams(words, n)
def bigrams(txt: str):
+ """Generates the bigrams (n=2) of the given string."""
return ngrams(txt, 2)
def trigrams(txt: str):
+ """Generates the trigrams (n=3) of the given string."""
return ngrams(txt, 3)
input_lines: Sequence[str], column_specs: Iterable[Iterable[int]], delim=''
) -> Iterable[str]:
"""Helper to shuffle / parse columnar data and return the results as a
- list. The column_specs argument is an iterable collection of
- numeric sequences that indicate one or more column numbers to
- copy.
+ list.
+
+ Args:
+ input_lines: A sequence of strings that represents text that
+ has been broken into columns by the caller
+ column_specs: an iterable collection of numeric sequences that
+ indicate one or more column numbers to copy to form the Nth
+ position in the output list. See example below.
+ delim: for column_specs that indicate we should copy more than
+ one column from the input into this position, use delim to
+ separate source data. Defaults to ''.
+
+ Returns:
+ A list of string created by following the instructions set forth
+ in column_specs.
>>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split()
>>> shuffle_columns_into_list(
... cols,
... [ [8], [2, 3], [5, 6, 7] ],
- ... delim=' ',
+ ... delim='!',
... )
- ['acl_test.py', 'scott wheel', 'Jul 9 11:34']
-
+ ['acl_test.py', 'scott!wheel', 'Jul!9!11:34']
"""
out = []
"""Helper to shuffle / parse columnar data and return the results
as a dict.
+ Args:
+ input_lines: a sequence of strings that represents text that
+ has been broken into columns by the caller
+ column_specs: instructions for what dictionary keys to apply
+ to individual or compound input column data. See example
+ below.
+ delim: when forming compound output data by gluing more than
+ one input column together, use this character to separate
+ the source data. Defaults to ''.
+
+ Returns:
+ A dict formed by applying the column_specs instructions.
+
>>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split()
>>> shuffle_columns_into_dict(
... cols,
... [ ('filename', [8]), ('owner', [2, 3]), ('mtime', [5, 6, 7]) ],
- ... delim=' ',
+ ... delim='!',
... )
- {'filename': 'acl_test.py', 'owner': 'scott wheel', 'mtime': 'Jul 9 11:34'}
-
+ {'filename': 'acl_test.py', 'owner': 'scott!wheel', 'mtime': 'Jul!9!11:34'}
"""
out = {}
def interpolate_using_dict(txt: str, values: Dict[str, str]) -> str:
- """Interpolate a string with data from a dict.
+ """
+ Interpolate a string with data from a dict.
+
+ Args:
+ txt: the mad libs template
+ values: what you and your kids chose for each category.
>>> interpolate_using_dict('This is a {adjective} {noun}.',
... {'adjective': 'good', 'noun': 'example'})
'This is a good example.'
-
"""
return sprintf(txt.format(**values), end='')
-def to_ascii(x: str):
- """Encode as ascii bytes string.
+def to_ascii(txt: str):
+ """
+ Args:
+ txt: the input data to encode
+
+ Returns:
+ txt encoded as an ASCII byte string.
>>> to_ascii('test')
b'test'
>>> to_ascii(b'1, 2, 3')
b'1, 2, 3'
-
"""
- if isinstance(x, str):
- return x.encode('ascii')
- if isinstance(x, bytes):
- return x
+ if isinstance(txt, str):
+ return txt.encode('ascii')
+ if isinstance(txt, bytes):
+ return txt
raise Exception('to_ascii works with strings and bytes')
def to_base64(txt: str, *, encoding='utf-8', errors='surrogatepass') -> bytes:
- """Encode txt and then encode the bytes with a 64-character
- alphabet. This is compatible with uudecode.
+ """
+ Args:
+ txt: the input data to encode
+
+ Returns:
+ txt encoded with a 64-chracter alphabet. Similar to and compatible
+ with uuencode/uudecode.
>>> to_base64('hello?')
b'aGVsbG8/\\n'
-
"""
return base64.encodebytes(txt.encode(encoding, errors))
def is_base64(txt: str) -> bool:
- """Determine whether a string is base64 encoded (with Python's standard
- base64 alphabet which is the same as what uuencode uses).
+ """
+ Args:
+ txt: the string to check
+
+ Returns:
+ True if txt is a valid base64 encoded string. This assumes
+ txt was encoded with Python's standard base64 alphabet which
+ is the same as what uuencode/uudecode uses).
>>> is_base64('test') # all letters in the b64 alphabet
True
def from_base64(b64: bytes, encoding='utf-8', errors='surrogatepass') -> str:
- """Convert base64 encoded string back to normal strings.
+ """
+ Args:
+ b64: bytestring of 64-bit encoded data to decode / convert.
+
+ Returns:
+ The decoded form of b64 as a normal python string. Similar to
+ and compatible with uuencode / uudecode.
>>> from_base64(b'aGVsbG8/\\n')
'hello?'
-
"""
return base64.decodebytes(b64).decode(encoding, errors)
-def chunk(txt: str, chunk_size):
- """Chunk up a string.
+def chunk(txt: str, chunk_size: int):
+ """
+ Args:
+ txt: a string to be chunked into evenly spaced pieces.
+ chunk_size: the size of each chunk to make
+
+ Returns:
+ The original string chunked into evenly spaced pieces.
>>> ' '.join(chunk('010011011100010110101010101010101001111110101000', 8))
'01001101 11000101 10101010 10101010 10011111 10101000'
-
"""
if len(txt) % chunk_size != 0:
msg = f'String to chunk\'s length ({len(txt)} is not an even multiple of chunk_size ({chunk_size})'
yield txt[x : x + chunk_size]
-def to_bitstring(txt: str, *, delimiter='', encoding='utf-8', errors='surrogatepass') -> str:
- """Encode txt and then chop it into bytes. Note: only bitstrings
- with delimiter='' are interpretable by from_bitstring.
+def to_bitstring(txt: str, *, delimiter='') -> str:
+ """
+ Args:
+ txt: the string to convert into a bitstring
+ delimiter: character to insert between adjacent bytes. Note that
+ only bitstrings with delimiter='' are interpretable by
+ :meth:`from_bitstring`.
+
+ Returns:
+ txt converted to ascii/binary and then chopped into bytes.
>>> to_bitstring('hello?')
'011010000110010101101100011011000110111100111111'
>>> to_bitstring(b'test')
'01110100011001010111001101110100'
-
"""
etxt = to_ascii(txt)
bits = bin(int.from_bytes(etxt, 'big'))
def is_bitstring(txt: str) -> bool:
- """Is this a bitstring?
+ """
+ Args:
+ txt: the string to check
+
+ Returns:
+ True if txt is a recognized bitstring and False otherwise.
+ Note that if delimiter is non empty this code will not
+ recognize the bitstring.
>>> is_bitstring('011010000110010101101100011011000110111100111111')
True
>>> is_bitstring('1234')
False
-
"""
return is_binary_integer_number(f'0b{txt}')
def from_bitstring(bits: str, encoding='utf-8', errors='surrogatepass') -> str:
- """Convert from bitstring back to bytes then decode into a str.
+ """
+ Args:
+ bits: the bitstring to convert back into a python string
+ encoding: the encoding to use
+
+ Returns:
+ The regular python string represented by bits. Note that this
+ code does not work with to_bitstring when delimiter is non-empty.
>>> from_bitstring('011010000110010101101100011011000110111100111111')
'hello?'
-
"""
n = int(bits, 2)
return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0'
def ip_v4_sort_key(txt: str) -> Optional[Tuple[int, ...]]:
- """Turn an IPv4 address into a tuple for sorting purposes.
+ """
+ Args:
+ txt: an IP address to chunk up for sorting purposes
+
+ Returns:
+ A tuple of IP components arranged such that the sorting of
+ IP addresses using a normal comparator will do something sane
+ and desireable.
>>> ip_v4_sort_key('10.0.0.18')
(10, 0, 0, 18)
>>> ips = ['10.0.0.10', '100.0.0.1', '1.2.3.4', '10.0.0.9']
>>> sorted(ips, key=lambda x: ip_v4_sort_key(x))
['1.2.3.4', '10.0.0.9', '10.0.0.10', '100.0.0.1']
-
"""
if not is_ip_v4(txt):
print(f"not IP: {txt}")
def path_ancestors_before_descendants_sort_key(volume: str) -> Tuple[str, ...]:
- """Chunk up a file path so that parent/ancestor paths sort before
- children/descendant paths.
+ """
+ Args:
+ volume: the string to chunk up for sorting purposes
+
+ Returns:
+ A tuple of volume's components such that the sorting of
+ volumes using a normal comparator will do something sane
+ and desireable.
>>> path_ancestors_before_descendants_sort_key('/usr/local/bin')
('usr', 'local', 'bin')
>>> paths = ['/usr/local', '/usr/local/bin', '/usr']
>>> sorted(paths, key=lambda x: path_ancestors_before_descendants_sort_key(x))
['/usr', '/usr/local', '/usr/local/bin']
-
"""
return tuple(x for x in volume.split('/') if len(x) > 0)
def replace_all(in_str: str, replace_set: str, replacement: str) -> str:
- """Execute several replace operations in a row.
+ """
+ Execute several replace operations in a row.
+
+ Args:
+ in_str: the string in which to replace characters
+ replace_set: the set of target characters to replace
+ replacement: the character to replace any member of replace_set
+ with
+
+ Returns:
+ The string with replacements executed.
>>> s = 'this_is a-test!'
>>> replace_all(s, ' _-!', '')
'thisisatest'
-
"""
for char in replace_set:
in_str = in_str.replace(char, replacement)
def replace_nth(in_str: str, source: str, target: str, nth: int):
- """Replaces the nth occurrance of a substring within a string.
+ """
+ Replaces the nth occurrance of a substring within a string.
+
+ Args:
+ in_str: the string in which to run the replacement
+ source: the substring to replace
+ target: the replacement text
+ nth: which occurrance of source to replace?
>>> replace_nth('this is a test', ' ', '-', 3)
'this is a-test'
-
"""
where = [m.start() for m in re.finditer(source, in_str)][nth - 1]
before = in_str[:where]
"""Row + Column"""
rows: int = 0
+ """Numer of rows"""
+
columns: int = 0
+ """Number of columns"""
def get_console_rows_columns() -> RowsColumns:
- """Returns the number of rows/columns on the current console."""
-
+ """
+ Returns:
+ The number of rows/columns on the current console or None
+ if we can't tell or an error occurred.
+ """
from exec_utils import cmd
rows: Optional[str] = os.environ.get('LINES', None)
right_end="]",
redraw=True,
) -> None:
- """Draws a progress graph."""
-
+ """Draws a progress graph at the current cursor position.
+
+ Args:
+ current: how many have we done so far?
+ total: how many are there to do total?
+ width: how many columns wide should be progress graph be?
+ fgcolor: what color should "done" part of the graph be?
+ left_end: the character at the left side of the graph
+ right_end: the character at the right side of the graph
+ redraw: if True, omit a line feed after the carriage return
+ so that subsequent calls to this method redraw the graph
+ iteratively.
+ """
percent = current / total
ret = "\r" if redraw else "\n"
bar = bar_graph(
) -> str:
"""Returns a string containing a bar graph.
+ Args:
+ percentage: percentage complete (0..100)
+ include_text: should we include the percentage text at the end?
+ width: how many columns wide should be progress graph be?
+ fgcolor: what color should "done" part of the graph be?
+ reset_seq: sequence to use to turn off color
+ left_end: the character at the left side of the graph
+ right_end: the character at the right side of the graph
+
>>> bar_graph(0.5, fgcolor='', reset_seq='')
'[███████████████████████████████████ ] 50.0%'
"""
Makes a "sparkline" little inline histogram graph. Auto scales.
+ Args:
+ numbers: the population over which to create the sparkline
+
+ Returns:
+ a three tuple containing:
+
+ * the minimum number in the population
+ * the maximum number in the population
+ * a string representation of the population in a concise format
+
>>> sparkline([1, 2, 3, 5, 10, 3, 5, 7])
(1, 10, '▁▁▂▄█▂▄▆')
"""
Distributes strings into a line for justified text.
+ Args:
+ strings: a list of string tokens to distribute
+ width: the width of the line to create
+ padding: the padding character to place between string chunks
+
+ Returns:
+ The distributed, justified string.
+
>>> distribute_strings(['this', 'is', 'a', 'test'], width=40)
' this is a test '
-
"""
ret = ' ' + ' '.join(strings) + ' '
assert len(string_utils.strip_ansi_sequences(ret)) < width
return ret
-def justify_string_by_chunk(string: str, width: int = 80, padding: str = " ") -> str:
+def _justify_string_by_chunk(string: str, width: int = 80, padding: str = " ") -> str:
"""
- Justifies a string.
+ Justifies a string chunk by chunk.
+
+ Args:
+ string: the string to be justified
+ width: how wide to make the output
+ padding: what padding character to use between chunks
+
+ Returns:
+ the justified string
- >>> justify_string_by_chunk("This is a test", 40)
+ >>> _justify_string_by_chunk("This is a test", 40)
'This is a test'
- >>> justify_string_by_chunk("This is a test", 20)
+ >>> _justify_string_by_chunk("This is a test", 20)
'This is a test'
"""
def justify_string(
string: str, *, width: int = 80, alignment: str = "c", padding: str = " "
) -> str:
- """Justify a string.
+ """Justify a string to width with left, right, center of justified
+ alignment.
+
+ Args:
+ string: the string to justify
+ width: the width to justify the string to
+ alignment: a single character indicating the desired alignment:
+ * 'c' = centered within the width
+ * 'j' = justified at width
+ * 'l' = left alignment
+ * 'r' = right alignment
+ padding: the padding character to use while justifying
>>> justify_string('This is another test', width=40, alignment='c')
' This is another test '
' This is another test'
>>> justify_string('This is another test', width=40, alignment='j')
'This is another test'
-
"""
alignment = alignment[0]
padding = padding[0]
elif alignment == "r":
string = padding + string
elif alignment == "j":
- return justify_string_by_chunk(string, width=width, padding=padding)
+ return _justify_string_by_chunk(string, width=width, padding=padding)
elif alignment == "c":
if len(string) % 2 == 0:
string += padding
def justify_text(text: str, *, width: int = 80, alignment: str = "c", indent_by: int = 0) -> str:
- """
- Justifies text optionally with initial indentation.
+ """Justifies text with left, right, centered or justified alignment
+ and optionally with initial indentation.
+
+ Args:
+ text: the text to be justified
+ width: the width at which to justify text
+ alignment: a single character indicating the desired alignment:
+ * 'c' = centered within the width
+ * 'j' = justified at width
+ * 'l' = left alignment
+ * 'r' = right alignment
+ indent_by: if non-zero, adds n prefix spaces to indent the text.
+
+ Returns:
+ The justified text.
>>> justify_text('This is a test of the emergency broadcast system. This is only a test.',
... width=40, alignment='j') #doctest: +NORMALIZE_WHITESPACE
def generate_padded_columns(text: List[str]) -> Generator:
+ """Given a list of strings, break them into columns using :meth:split
+ and then compute the maximum width of each column. Finally,
+ distribute the columular chunks into the output padding each to
+ the proper width.
+
+ Args:
+ text: a list of strings to chunk into padded columns
+
+ Returns:
+ padded columns based on text.split()
+
+ >>> for x in generate_padded_columns(
+ ... [ 'reading writing arithmetic',
+ ... 'mathematics psychology physics',
+ ... 'communications sociology anthropology' ]):
+ ... print(x.strip())
+ reading writing arithmetic
+ mathematics psychology physics
+ communications sociology anthropology
+ """
max_width: Dict[int, int] = defaultdict(int)
for line in text:
for pos, word in enumerate(line.split()):
def wrap_string(text: str, n: int) -> str:
+ """
+ Args:
+ text: the string to be wrapped
+ n: the width after which to wrap text
+
+ Returns:
+ The wrapped form of text
+ """
chunks = text.split()
out = ''
width = 0
test
-ing
1, 2, 3
-
"""
def __init__(
pad_char: str = ' ',
pad_count: int = 4,
):
+ """Construct an Indenter.
+
+ Args:
+ pad_prefix: an optional prefix to prepend to each line
+ pad_char: the character used to indent
+ pad_count: the number of pad_chars to use to indent
+ """
self.level = -1
if pad_prefix is not None:
self.pad_prefix = pad_prefix
color: Optional[str] = None,
):
"""
- Returns a nice header line with a title.
+ Creates a nice header line with a title.
+
+ Args:
+ title: the title
+ width: how wide to make the header
+ align: "left" or "right"
+ style: "ascii", "solid" or "dashed"
+
+ Returns:
+ The header as a string.
>>> header('title', width=60, style='ascii')
'----[ title ]-----------------------------------------------'
-
"""
if not width:
try:
def box(
title: Optional[str] = None, text: Optional[str] = None, *, width: int = 80, color: str = ''
) -> str:
+ """
+ Make a nice unicode box (optionally with color) around some text.
+
+ Args:
+ title: the title of the box
+ text: the text in the box
+ width: the box's width
+ color: the box's color
+
+ Returns:
+ the box as a string
+
+ >>> print(box('title', 'this is some text', width=20).strip())
+ ╭──────────────────╮
+ │ title │
+ │ │
+ │ this is some │
+ │ text │
+ ╰──────────────────╯
+ """
assert width > 4
if text is not None:
text = justify_text(text, width=width - 4, alignment='l')
def preformatted_box(
title: Optional[str] = None, text: Optional[str] = None, *, width=80, color: str = ''
) -> str:
+ """Creates a nice box with rounded corners and returns it as a string.
+
+ Args:
+ title: the title of the box
+ text: the text inside the box
+ width: the width of the box
+ color: the box's color
+
+ Returns:
+ the box as a string
+
+ >>> print(preformatted_box('title', 'this\\nis\\nsome\\ntext', width=20).strip())
+ ╭──────────────────╮
+ │ title │
+ │ │
+ │ this │
+ │ is │
+ │ some │
+ │ text │
+ ╰──────────────────╯
+ """
assert width > 4
ret = ''
if color == '':
╭────╮
│ OK │
╰────╯
-
"""
print(preformatted_box(title, text, width=width, color=color), end='')
def current_thread_id() -> str:
- """Returns a string composed of the parent process' id, the current
- process' id and the current thread identifier. The former two are
- numbers (pids) whereas the latter is a thread id passed during thread
- creation time.
+ """
+ Returns:
+ a string composed of the parent process' id, the current
+ process' id and the current thread identifier. The former two are
+ numbers (pids) whereas the latter is a thread id passed during thread
+ creation time.
>>> ret = current_thread_id()
>>> (ppid, pid, tid) = ret.split('/')
def is_current_thread_main_thread() -> bool:
- """Returns True is the current (calling) thread is the process' main
- thread and False otherwise.
+ """
+ Returns:
+ True is the current (calling) thread is the process' main
+ thread and False otherwise.
>>> is_current_thread_main_thread()
True
) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
"""A function decorator to create a background thread.
- *** Please note: the decorated function must take an shutdown ***
- *** event as an input parameter and should periodically check ***
- *** it and stop if the event is set. ***
-
Usage::
@background_thread
event.set()
thread.join()
- Note: in addition to any other arguments the function has, it must
- take a stop_event as the last unnamed argument which it should
- periodically check. If the event is set, it means the thread has
- been requested to terminate ASAP.
+ .. warning::
+
+ In addition to any other arguments the function has, it must
+ take a stop_event as the last unnamed argument which it should
+ periodically check. If the event is set, it means the thread has
+ been requested to terminate ASAP.
"""
def wrapper(funct: Callable):
stop_after: Optional[int],
):
"""
- Periodically invoke a decorated function. Stop after N invocations
- (or, if stop_after is None, call forever). Delay period_sec between
- invocations.
+ Periodically invoke the decorated function.
+
+ Args:
+ period_sec: the delay period in seconds between invocations
+ stop_after: total number of invocations to make or, if None,
+ call forever
- Returns a Thread object and an Event that, when signaled, will stop
- the invocations. Note that it is possible to be invoked one time
- after the Event is set. This event can be used to stop infinite
- invocation style or finite invocation style decorations.::
+ Returns:
+ a :class:Thread object and an :class:Event that, when
+ signaled, will stop the invocations.
+
+ .. note::
+ It is possible to be invoked one time after the :class:Event
+ is set. This event can be used to stop infinite
+ invocation style or finite invocation style decorations.
+
+ Usage::
@periodically_invoke(period_sec=0.5, stop_after=None)
def there(name: str, age: int) -> None:
@periodically_invoke(period_sec=1.0, stop_after=3)
def hello(name: str) -> None:
print(f"Hello, {name}")
-
"""
def decorator_repeat(func):
def unwrap_optional(x: Optional[Any]) -> Any:
"""Unwrap an Optional[Type] argument returning a Type value back.
- If the Optional[Type] argument is None, however, raise an exception.
- Use this to satisfy most type checkers that a value that could
- be None isn't so as to drop the Optional typing hint.
+ Use this to satisfy most type checkers that a value that could be
+ None isn't so as to drop the Optional typing hint.
+
+ Args:
+ x: an Optional[Type] argument
+
+ Returns:
+ If the Optional[Type] argument is non-None, return it.
+ If the Optional[Type] argument is None, however, raise an
+ exception.
>>> x: Optional[bool] = True
>>> unwrap_optional(x)
Traceback (most recent call last):
...
AssertionError: Argument to unwrap_optional was unexpectedly None
-
"""
if x is None:
msg = 'Argument to unwrap_optional was unexpectedly None'
# © Copyright 2021-2022, Scott Gasch
-"""Helpers for unittests. Note that when you import this we
-automatically wrap unittest.main() with a call to bootstrap.initialize
-so that we getLogger config, commandline args, logging control,
-etc... this works fine but it's a little hacky so caveat emptor.
+"""Helpers for unittests.
+.. note::
+
+ When you import this we automatically wrap unittest.main()
+ with a call to bootstrap.initialize so that we getLogger
+ config, commandline args, logging control, etc... this works
+ fine but it's a little hacky so caveat emptor.
"""
import contextlib
"""A PresenceDetector that is waitable. This is not part of
base_presence.py because I do not want to bring these dependencies
into that lower-level module (especially state_tracker).
-
"""
import datetime