From 2c54bfde335f3631f045a871c540c9d63c5bb081 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Sun, 14 Nov 2021 22:48:04 -0800 Subject: [PATCH 01/16] Tighten up the remote executor. --- exec_utils.py | 7 +- executors.py | 253 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 214 insertions(+), 46 deletions(-) diff --git a/exec_utils.py b/exec_utils.py index 7e9dae5..89cfbd7 100644 --- a/exec_utils.py +++ b/exec_utils.py @@ -1,11 +1,15 @@ #!/usr/bin/env python3 import atexit +import logging import shlex import subprocess from typing import List, Optional +logger = logging.getLogger(__file__) + + def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int: """ Run a command but do not let it run for more than timeout seconds. @@ -72,6 +76,7 @@ def cmd_in_background( stderr=subprocess.DEVNULL) else: subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL) + def kill_subproc() -> None: try: if subproc.poll() is None: @@ -79,7 +84,7 @@ def cmd_in_background( subproc.terminate() subproc.wait(timeout=10.0) except BaseException as be: - log.error(be) + logger.exception(be) atexit.register(kill_subproc) return subproc diff --git a/executors.py b/executors.py index 6723bb9..b16ad92 100644 --- a/executors.py +++ b/executors.py @@ -275,7 +275,7 @@ class RemoteExecutorStatus: ) -> None: assert self.lock.locked() self.known_workers.add(worker) - self.start_per_bundle[uuid] = time.time() + self.start_per_bundle[uuid] = None x = self.in_flight_bundles_by_worker.get(worker, set()) x.add(uuid) self.in_flight_bundles_by_worker[worker] = x @@ -292,6 +292,17 @@ class RemoteExecutorStatus: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details + def record_release_worker( + self, + worker: RemoteWorkerRecord, + uuid: str, + was_cancelled: bool, + ) -> None: + with self.lock: + self.record_release_worker_already_locked( + worker, uuid, was_cancelled + ) + def record_release_worker_already_locked( self, worker: RemoteWorkerRecord, @@ -309,6 +320,10 @@ class RemoteExecutorStatus: self.finished_bundle_timings_per_worker[worker] = x self.finished_bundle_timings.append(bundle_latency) + def record_processing_began(self, uuid: str): + with self.lock: + self.start_per_bundle[uuid] = time.time() + def total_in_flight(self) -> int: assert self.lock.locked() total_in_flight = 0 @@ -361,8 +376,13 @@ class RemoteExecutorStatus: None ) pid = str(details.pid) if details is not None else "TBD" - sec = ts - self.start_per_bundle[bundle_uuid] - ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far ' + if self.start_per_bundle[bundle_uuid] is not None: + sec = ts - self.start_per_bundle[bundle_uuid] + ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far ' + else: + ret += f' {bundle_uuid} setting up / copying data...' + sec = 0.0 + if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' @@ -491,6 +511,7 @@ class RemoteExecutor(BaseExecutor): raise Exception(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() + logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.') self._helper_executor = fut.ThreadPoolExecutor( thread_name_prefix="remote_executor_helper", max_workers=self.worker_count, @@ -536,7 +557,8 @@ class RemoteExecutor(BaseExecutor): self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule - if len(self.status.finished_bundle_timings) > 7: + num_done = len(self.status.finished_bundle_timings) + if num_done > 7 or (num_done > 5 and self.is_worker_available()): for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): for uuid in bundle_uuids: bundle = self.status.bundle_details_by_uuid.get(uuid, None) @@ -576,25 +598,28 @@ class RemoteExecutor(BaseExecutor): return True return False - def launch(self, bundle: BundleDetails) -> Any: + def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any: """Find a worker for bundle or block until one is available.""" self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname - avoid_machine = None + avoid_machine = override_avoid_machine + is_original = bundle.src_bundle is None # Try not to schedule a backup on the same host as the original. - if bundle.src_bundle is not None: + if avoid_machine is None and bundle.src_bundle is not None: avoid_machine = bundle.src_bundle.machine worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) + + # Ok, found a worker. bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username fname = bundle.fname self.status.record_acquire_worker(worker, uuid) - logger.debug(f'Running bundle {uuid} on {worker}...') + logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...') # Before we do any work, make sure the bundle is still viable. if self.check_if_cancelled(bundle): @@ -602,54 +627,151 @@ class RemoteExecutor(BaseExecutor): return self.post_launch_work(bundle) except Exception as e: logger.exception(e) - logger.info(f"{uuid}/{fname}: bundle seems to have failed?!") - if bundle.failure_count < config.config['executors_max_bundle_failures']: - return self.launch(bundle) + logger.error( + f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!' + ) + assert bundle.worker is not None + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + True, + ) + self.release_worker(bundle.worker) + self.adjust_task_count(-1) + if is_original: + # Weird. We are the original owner of this + # bundle. For it to have been cancelled, a backup + # must have already started and completed before + # we even for started. Moreover, the backup says + # it is done but we can't find the results it + # should have copied over. Reschedule the whole + # thing. + return self.emergency_retry_nasty_bundle(bundle) else: - logger.info(f"{uuid}/{fname}: bundle is poison, giving up on it.") + # Expected(?). We're a backup and our bundle is + # cancelled before we even got started. Something + # went bad in post_launch_work (I acutually don't + # see what?) but probably not worth worrying + # about. return None - # Send input to machine if it's not local. + # Send input code / data to worker machine if it's not local. if hostname not in machine: - cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' - logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}") - run_silently(cmd) + try: + cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' + start_ts = time.time() + logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.") + run_silently(cmd) + xfer_latency = time.time() - start_ts + logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.") + except Exception as e: + logger.exception(e) + logger.error( + f'{uuid}/{fname}: failed to send instructions to worker machine?!?' + ) + assert bundle.worker is not None + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + True, + ) + self.release_worker(bundle.worker) + self.adjust_task_count(-1) + if is_original: + # Weird. We tried to copy the code to the worker and it failed... + # And we're the original bundle. We have to retry. + return self.emergency_retry_nasty_bundle(bundle) + else: + # This is actually expected; we're a backup. + # There's a race condition where someone else + # already finished the work and removed the source + # code file before we could copy it. No biggie. + return None - # Do it. + # Kick off the work. Note that if this fails we let + # wait_for_process deal with it. + self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' f'"source py39-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') + logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid - logger.info(f"{uuid}/{fname}: Start training on {worker} via {cmd} (background pid {pid})") + logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.') + return self.wait_for_process(p, bundle, 0) + + def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any: + uuid = bundle.uuid + machine = bundle.machine + fname = bundle.fname + pid = p.pid + if depth > 3: + logger.error( + f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}." + ) + p.terminate() + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + True, + ) + self.release_worker(bundle.worker) + self.adjust_task_count(-1) + return self.emergency_retry_nasty_bundle(bundle) + # Spin until either the ssh job we scheduled finishes the + # bundle or some backup worker signals that they finished it + # before we could. while True: try: p.wait(timeout=0.25) except subprocess.TimeoutExpired: self.heartbeat() - - # Both source and backup bundles can be cancelled by - # the other depending on which finishes first. if self.check_if_cancelled(bundle): - p.terminate() + logger.info( + f'{uuid}/{fname}: another worker finished bundle, checking it out...' + ) break else: - logger.debug( - f"{uuid}/{fname}: pid {pid} has finished its work normally." + logger.info( + f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..." ) + p = None break + # If we get here we believe the bundle is done; either the ssh + # subprocess finished (hopefully successfully) or we noticed + # that some other worker seems to have completed the bundle + # and we're bailing out. try: - return self.post_launch_work(bundle) + ret = self.post_launch_work(bundle) + if ret is not None and p is not None: + p.terminate() + return ret + + # Something went wrong; e.g. we could not copy the results + # back, cleanup after ourselves on the remote machine, or + # unpickle the results we got from the remove machine. If we + # still have an active ssh subprocess, keep waiting on it. + # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) - logger.info(f"{uuid}: Bundle seems to have failed?!") - if bundle.failure_count < config.config['executors_max_bundle_failures']: - return self.launch(bundle) - logger.info(f"{uuid}: Bundle is poison, giving up on it.") - return None + logger.error(f'{uuid}/{fname}: Something unexpected just happened...') + if p is not None: + logger.warning( + f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + ) + return self.wait_for_process(p, bundle, depth + 1) + else: + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + True, + ) + self.release_worker(bundle.worker) + self.adjust_task_count(-1) + return self.emergency_retry_nasty_bundle(bundle) def post_launch_work(self, bundle: BundleDetails) -> Any: with self.status.lock: @@ -673,20 +795,14 @@ class RemoteExecutor(BaseExecutor): logger.info( f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}" ) - try: - run_silently(cmd) - except subprocess.CalledProcessError: - logger.critical(f'Failed to copy {username}@{machine}:{result_file}!') + + # If either of these throw they are handled in + # wait_for_process. + run_silently(cmd) run_silently(f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"') dur = bundle.end_ts - bundle.start_ts self.histogram.add_item(dur) - assert bundle.worker is not None - self.status.record_release_worker_already_locked( - bundle.worker, - bundle.uuid, - was_cancelled - ) # Only the original worker should unpickle the file contents # though since it's the only one whose result matters. The @@ -701,11 +817,22 @@ class RemoteExecutor(BaseExecutor): serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: - msg = f'Failed to load {result_file}' + msg = f'Failed to load {result_file}, this is bad news.' logger.critical(msg) - bundle.failure_count += 1 + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + True, + ) self.release_worker(bundle.worker) + + # Re-raise the exception; the code in wait_for_process may + # decide to emergency_retry_nasty_bundle here. raise Exception(e) + + logger.debug( + f'Removing local (master) {code_file} and {result_file}.' + ) os.remove(f'{result_file}') os.remove(f'{code_file}') @@ -720,7 +847,8 @@ class RemoteExecutor(BaseExecutor): ) backup.is_cancelled.set() - # This is a backup job. + # This is a backup job and, by now, we have already fetched + # the bundle results. else: # Backup results don't matter, they just need to leave the # result file in the right place for their originals to @@ -730,11 +858,16 @@ class RemoteExecutor(BaseExecutor): # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( - f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} that it\'s cancelled' + f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' ) bundle.src_bundle.is_cancelled.set() assert bundle.worker is not None + self.status.record_release_worker( + bundle.worker, + bundle.uuid, + was_cancelled, + ) self.release_worker(bundle.worker) self.adjust_task_count(-1) return result @@ -818,6 +951,36 @@ class RemoteExecutor(BaseExecutor): # they will move the result_file to this machine and let # the original pick them up and unpickle them. + def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future: + uuid = bundle.uuid + is_original = bundle.src_bundle is None + bundle.worker = None + avoid_last_machine = bundle.machine + bundle.machine = None + bundle.username = None + bundle.failure_count += 1 + if is_original: + retry_limit = 3 + else: + retry_limit = 2 + + if bundle.failure_count > retry_limit: + logger.error( + f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.' + ) + if is_original: + logger.critical( + f'{uuid}: This is the original of the bundle; results will be incomplete.' + ) + else: + logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.') + return None + else: + logger.warning( + f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<' + ) + return self.launch(bundle, avoid_last_machine) + @overrides def submit(self, function: Callable, @@ -862,8 +1025,8 @@ class DefaultExecutors(object): return self.process_executor def remote_pool(self) -> RemoteExecutor: - logger.info('Looking for some helper machines...') if self.remote_executor is None: + logger.info('Looking for some helper machines...') pool: List[RemoteWorkerRecord] = [] if self.ping('cheetah.house'): logger.info('Found cheetah.house') -- 2.45.2 From bef486c8c06e8d743a98b89910658a615acc8bbc Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Mon, 15 Nov 2021 16:55:26 -0800 Subject: [PATCH 02/16] Making remote training work better. --- executors.py | 18 ++++++----- logging_utils.py | 2 +- ml/model_trainer.py | 2 +- parallelize.py | 4 --- remote_worker.py | 74 ++++++++++++++++++++++++++++----------------- 5 files changed, 58 insertions(+), 42 deletions(-) diff --git a/executors.py b/executors.py index b16ad92..fe8d9d0 100644 --- a/executors.py +++ b/executors.py @@ -197,6 +197,11 @@ class ProcessExecutor(BaseExecutor): return state +class RemoteExecutorException(Exception): + """Thrown when a bundle cannot be executed despite several retries.""" + pass + + @dataclass class RemoteWorkerRecord: username: str @@ -508,7 +513,7 @@ class RemoteExecutor(BaseExecutor): if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) - raise Exception(msg) + raise RemoteExecutorException(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.') @@ -518,9 +523,6 @@ class RemoteExecutor(BaseExecutor): ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 - logger.debug( - f'Creating remote processpool with {self.worker_count} remote worker threads.' - ) def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -556,7 +558,7 @@ class RemoteExecutor(BaseExecutor): # Regular progress report self.status.periodic_dump(self.total_bundles_submitted) - # Look for bundles to reschedule + # Look for bundles to reschedule. num_done = len(self.status.finished_bundle_timings) if num_done > 7 or (num_done > 5 and self.is_worker_available()): for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): @@ -663,7 +665,7 @@ class RemoteExecutor(BaseExecutor): logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts - logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.") + logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.") except Exception as e: logger.exception(e) logger.error( @@ -969,8 +971,8 @@ class RemoteExecutor(BaseExecutor): f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: - logger.critical( - f'{uuid}: This is the original of the bundle; results will be incomplete.' + raise RemoteExecutorException( + f'{uuid}: This bundle can\'t be completed despite several backups and retries' ) else: logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.') diff --git a/logging_utils.py b/logging_utils.py index 7be31e3..819e3d3 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -76,7 +76,7 @@ cfg.add_argument( cfg.add_argument( '--logging_filename_count', type=int, - default=2, + default=7, metavar='COUNT', help='The number of logging_filename copies to keep before deleting.' ) diff --git a/ml/model_trainer.py b/ml/model_trainer.py index 9435351..f9e132e 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -218,7 +218,7 @@ class TrainingBlueprint(ABC): line = line.strip() try: (key, value) = line.split(self.spec.key_value_delimiter) - except Exception as e: + except Exception: logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped") continue diff --git a/parallelize.py b/parallelize.py index 0822095..d9c202f 100644 --- a/parallelize.py +++ b/parallelize.py @@ -6,10 +6,6 @@ from enum import Enum import functools import typing -ps_count = 0 -thread_count = 0 -remote_count = 0 - class Method(Enum): THREAD = 1 diff --git a/remote_worker.py b/remote_worker.py index 43b8415..84f8d56 100755 --- a/remote_worker.py +++ b/remote_worker.py @@ -4,11 +4,11 @@ results. """ +import logging import os -import platform import signal -import sys import threading +import sys import time import cloudpickle # type: ignore @@ -20,6 +20,8 @@ import config from thread_utils import background_thread +logger = logging.getLogger(__file__) + cfg = config.add_commandline_args( f"Remote Worker ({__file__})", "Helper to run pickled code remotely and return results", @@ -54,49 +56,65 @@ def watch_for_cancel(terminate_event: threading.Event) -> None: ancestors = p.parents() for ancestor in ancestors: name = ancestor.name() - if 'ssh' in name or 'Ssh' in name: + if 'ssh' in name.lower(): saw_sshd = True break if not saw_sshd: os.system('pstree') os.kill(os.getpid(), signal.SIGTERM) + time.sleep(5.0) + os.kill(os.getpid(), signal.SIGKILL) + sys.exit(-1) if terminate_event.is_set(): return time.sleep(1.0) -if __name__ == '__main__': - @bootstrap.initialize - def main() -> None: - hostname = platform.node() - - # Windows-Linux is retarded. - # if ( - # hostname != 'VIDEO-COMPUTER' and - # config.config['watch_for_cancel'] - # ): - # (thread, terminate_event) = watch_for_cancel() - - in_file = config.config['code_file'] - out_file = config.config['result_file'] +@bootstrap.initialize +def main() -> None: + in_file = config.config['code_file'] + out_file = config.config['result_file'] + logger.debug(f'Reading {in_file}.') + try: with open(in_file, 'rb') as rb: serialized = rb.read() + except Exception as e: + logger.exception(e) + logger.critical(f'Problem reading {in_file}. Aborting.') + sys.exit(-1) + logger.debug(f'Deserializing {in_file}.') + try: fun, args, kwargs = cloudpickle.loads(serialized) - print(fun) - print(args) - print(kwargs) - print("Invoking the code...") - ret = fun(*args, **kwargs) - + except Exception as e: + logger.exception(e) + logger.critical(f'Problem deserializing {in_file}. Aborting.') + sys.exit(-1) + + logger.debug('Invoking user code...') + start = time.time() + ret = fun(*args, **kwargs) + end = time.time() + logger.debug(f'User code took {end - start:.1f}s') + + logger.debug('Serializing results') + try: serialized = cloudpickle.dumps(ret) + except Exception as e: + logger.exception(e) + logger.critical(f'Could not serialize result ({type(ret)}). Aborting.') + sys.exit(-1) + + logger.debug(f'Writing {out_file}.') + try: with open(out_file, 'wb') as wb: wb.write(serialized) + except Exception as e: + logger.exception(e) + logger.critical(f'Error writing {out_file}. Aborting.') + sys.exit(-1) - # Windows-Linux is retarded. - # if hostname != 'VIDEO-COMPUTER': - # terminate_event.set() - # thread.join() - sys.exit(0) + +if __name__ == '__main__': main() -- 2.45.2 From 9821d383ba3de886f8d11d00a588e49c2c280579 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Wed, 17 Nov 2021 14:40:34 -0800 Subject: [PATCH 03/16] Make remote workers die if no longer needed; cleanups in executors. --- executors.py | 79 +++++++++++++++++++++++++++++++----------------- remote_worker.py | 17 +++++++++++ 2 files changed, 68 insertions(+), 28 deletions(-) diff --git a/executors.py b/executors.py index fe8d9d0..a027326 100644 --- a/executors.py +++ b/executors.py @@ -22,7 +22,7 @@ from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils import config -from exec_utils import run_silently, cmd_in_background +from exec_utils import run_silently, cmd_in_background, cmd_with_timeout from decorator_utils import singleton import histogram as hist @@ -524,6 +524,27 @@ class RemoteExecutor(BaseExecutor): self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 + def bundle_prefix(self, bundle: BundleDetails) -> str: + colorz = [ + fg('violet red'), + fg('red'), + fg('orange'), + fg('peach orange'), + fg('yellow'), + fg('marigold yellow'), + fg('green yellow'), + fg('tea green'), + fg('cornflower blue'), + fg('turquoise blue'), + fg('tropical blue'), + fg('lavender purple'), + fg('medium purple'), + ] + c = colorz[int(bundle.uuid[-2:], 16) % len(colorz)] + fname = bundle.fname if bundle.fname is not None else 'nofname' + machine = bundle.machine if bundle.machine is not None else 'nomachine' + return f'{c}{bundle.uuid[-8:]}/{fname}/{machine}{reset()}' + def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -621,7 +642,7 @@ class RemoteExecutor(BaseExecutor): username = bundle.username = worker.username fname = bundle.fname self.status.record_acquire_worker(worker, uuid) - logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...') + logger.debug(f'{self.bundle_prefix(bundle)}: Running bundle on {worker}...') # Before we do any work, make sure the bundle is still viable. if self.check_if_cancelled(bundle): @@ -630,7 +651,7 @@ class RemoteExecutor(BaseExecutor): except Exception as e: logger.exception(e) logger.error( - f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!' + f'{self.bundle_prefix(bundle)}: bundle says it\'s cancelled upfront but no results?!' ) assert bundle.worker is not None self.status.record_release_worker( @@ -662,14 +683,14 @@ class RemoteExecutor(BaseExecutor): try: cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() - logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.") + logger.info(f"{self.bundle_prefix(bundle)}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts - logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.") + logger.info(f"{self.bundle_prefix(bundle)}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: logger.exception(e) logger.error( - f'{uuid}/{fname}: failed to send instructions to worker machine?!?' + f'{self.bundle_prefix(bundle)}: failed to send instructions to worker machine?!?' ) assert bundle.worker is not None self.status.record_release_worker( @@ -697,10 +718,10 @@ class RemoteExecutor(BaseExecutor): f'"source py39-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') - logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...') + logger.debug(f'{self.bundle_prefix(bundle)}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid - logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.') + logger.debug(f'{self.bundle_prefix(bundle)}: Local ssh process pid={pid}; remote worker is {machine}.') return self.wait_for_process(p, bundle, 0) def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any: @@ -732,12 +753,12 @@ class RemoteExecutor(BaseExecutor): self.heartbeat() if self.check_if_cancelled(bundle): logger.info( - f'{uuid}/{fname}: another worker finished bundle, checking it out...' + f'{self.bundle_prefix(bundle)}: another worker finished bundle, checking it out...' ) break else: logger.info( - f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..." + f"{self.bundle_prefix(bundle)}: pid {pid} ({machine}) our ssh finished, checking it out..." ) p = None break @@ -759,10 +780,10 @@ class RemoteExecutor(BaseExecutor): # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) - logger.error(f'{uuid}/{fname}: Something unexpected just happened...') + logger.error(f'{self.bundle_prefix(bundle)}: Something unexpected just happened...') if p is not None: logger.warning( - f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + f"{self.bundle_prefix(bundle)}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." ) return self.wait_for_process(p, bundle, depth + 1) else: @@ -795,7 +816,7 @@ class RemoteExecutor(BaseExecutor): if bundle.hostname not in bundle.machine: cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( - f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}" + f"{self.bundle_prefix(bundle)}: Fetching results from {username}@{machine} via {cmd}" ) # If either of these throw they are handled in @@ -813,7 +834,7 @@ class RemoteExecutor(BaseExecutor): # if one of the backups finished first; it still must read the # result from disk. if is_original: - logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.") + logger.debug(f"{self.bundle_prefix(bundle)}: Unpickling {result_file}.") try: with open(f'{result_file}', 'rb') as rb: serialized = rb.read() @@ -845,7 +866,7 @@ class RemoteExecutor(BaseExecutor): if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( - f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled' + f'{self.bundle_prefix(bundle)}: Notifying backup {backup.uuid} that it\'s cancelled' ) backup.is_cancelled.set() @@ -860,7 +881,7 @@ class RemoteExecutor(BaseExecutor): # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( - f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' + f'{self.bundle_prefix(bundle)}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' ) bundle.src_bundle.is_cancelled.set() @@ -906,7 +927,7 @@ class RemoteExecutor(BaseExecutor): failure_count = 0, ) self.status.record_bundle_details(bundle) - logger.debug(f'{uuid}/{fname}: Created original bundle') + logger.debug(f'{self.bundle_prefix(bundle)}: Created an original bundle') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): @@ -937,7 +958,7 @@ class RemoteExecutor(BaseExecutor): ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) - logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle') + logger.debug(f'{self.bundle_prefix(bundle)}: Created a backup bundle') return backup_bundle def schedule_backup_for_bundle(self, @@ -968,18 +989,18 @@ class RemoteExecutor(BaseExecutor): if bundle.failure_count > retry_limit: logger.error( - f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.' + f'{self.bundle_prefix(bundle)}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: raise RemoteExecutorException( - f'{uuid}: This bundle can\'t be completed despite several backups and retries' + f'{self.bundle_prefix(bundle)}: This bundle can\'t be completed despite several backups and retries' ) else: - logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.') + logger.error(f'{self.bundle_prefix(bundle)}: At least it\'s only a backup; better luck with the others.') return None else: logger.warning( - f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<' + f'>>> Emergency rescheduling {self.bundle_prefix(bundle)} because of unexected errors (wtf?!) <<<' ) return self.launch(bundle, avoid_last_machine) @@ -1009,12 +1030,14 @@ class DefaultExecutors(object): def ping(self, host) -> bool: logger.debug(f'RUN> ping -c 1 {host}') - command = ['ping', '-c', '1', host] - return subprocess.call( - command, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) == 0 + try: + x = cmd_with_timeout( + f'ping -c 1 {host} >/dev/null 2>/dev/null', + timeout_seconds=1.0 + ) + return x == 0 + except Exception: + return False def thread_pool(self) -> ThreadExecutor: if self.thread_executor is None: diff --git a/remote_worker.py b/remote_worker.py index 84f8d56..bf8de6c 100755 --- a/remote_worker.py +++ b/remote_worker.py @@ -6,6 +6,7 @@ results. import logging import os +import platform import signal import threading import sys @@ -50,16 +51,23 @@ cfg.add_argument( @background_thread def watch_for_cancel(terminate_event: threading.Event) -> None: + if platform.node() == 'VIDEO-COMPUTER': + logger.warning('Background thread not allowed on retarded computers, sorry.') + return + logger.debug('Starting up background thread...') p = psutil.Process(os.getpid()) while True: saw_sshd = False ancestors = p.parents() for ancestor in ancestors: name = ancestor.name() + pid = ancestor.pid + logger.debug(f'Ancestor process {name} (pid={pid})') if 'ssh' in name.lower(): saw_sshd = True break if not saw_sshd: + logger.error('Did not see sshd in our ancestors list?! Committing suicide.') os.system('pstree') os.kill(os.getpid(), signal.SIGTERM) time.sleep(5.0) @@ -75,6 +83,8 @@ def main() -> None: in_file = config.config['code_file'] out_file = config.config['result_file'] + (thread, stop_thread) = watch_for_cancel() + logger.debug(f'Reading {in_file}.') try: with open(in_file, 'rb') as rb: @@ -82,6 +92,7 @@ def main() -> None: except Exception as e: logger.exception(e) logger.critical(f'Problem reading {in_file}. Aborting.') + stop_thread.set() sys.exit(-1) logger.debug(f'Deserializing {in_file}.') @@ -90,6 +101,7 @@ def main() -> None: except Exception as e: logger.exception(e) logger.critical(f'Problem deserializing {in_file}. Aborting.') + stop_thread.set() sys.exit(-1) logger.debug('Invoking user code...') @@ -104,6 +116,7 @@ def main() -> None: except Exception as e: logger.exception(e) logger.critical(f'Could not serialize result ({type(ret)}). Aborting.') + stop_thread.set() sys.exit(-1) logger.debug(f'Writing {out_file}.') @@ -113,8 +126,12 @@ def main() -> None: except Exception as e: logger.exception(e) logger.critical(f'Error writing {out_file}. Aborting.') + stop_thread.set() sys.exit(-1) + stop_thread.set() + thread.join() + if __name__ == '__main__': main() -- 2.45.2 From 3f4818bf9d1f3071c4e2906896d422810b5776bc Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Wed, 17 Nov 2021 15:26:43 -0800 Subject: [PATCH 04/16] Bugfixes in executors. --- executors.py | 114 ++++++++++++++++++++++++++------------------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/executors.py b/executors.py index a027326..b446822 100644 --- a/executors.py +++ b/executors.py @@ -238,6 +238,34 @@ class BundleDetails: backup_bundles: Optional[List[BundleDetails]] failure_count: int + def __repr__(self): + uuid = self.uuid + if uuid[-9:-2] == '_backup': + uuid = uuid[:-9] + suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}' + else: + suffix = uuid[-6:] + + colorz = [ + fg('violet red'), + fg('red'), + fg('orange'), + fg('peach orange'), + fg('yellow'), + fg('marigold yellow'), + fg('green yellow'), + fg('tea green'), + fg('cornflower blue'), + fg('turquoise blue'), + fg('tropical blue'), + fg('lavender purple'), + fg('medium purple'), + ] + c = colorz[int(uuid[-2:], 16) % len(colorz)] + fname = self.fname if self.fname is not None else 'nofname' + machine = self.machine if self.machine is not None else 'nomachine' + return f'{c}{suffix}/{fname}/{machine}{reset()}' + class RemoteExecutorStatus: def __init__(self, total_worker_count: int) -> None: @@ -380,12 +408,12 @@ class RemoteExecutorStatus: bundle_uuid, None ) - pid = str(details.pid) if details is not None else "TBD" + pid = str(details.pid) if (details and details.pid != 0) else "TBD" if self.start_per_bundle[bundle_uuid] is not None: sec = ts - self.start_per_bundle[bundle_uuid] - ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far ' + ret += f' (pid={pid}): {details} for {sec:.1f}s so far ' else: - ret += f' {bundle_uuid} setting up / copying data...' + ret += f' {details} setting up / copying data...' sec = 0.0 if qworker is not None: @@ -397,12 +425,12 @@ class RemoteExecutorStatus: if sec > qall[1] * 1.5: ret += f'{bg("red")}!!!{reset()}' if details is not None: - logger.debug(f'Flagging {details.uuid} for another backup') + logger.debug(f'Flagging {details} for another backup') details.super_slow = True elif sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: - logger.debug(f'Flagging {details.uuid} for a backup') + logger.debug(f'Flagging {details} for a backup') details.too_slow = True elif sec > qall[0]: ret += f'{fg("red")}>∀p50{reset()}' @@ -524,27 +552,6 @@ class RemoteExecutor(BaseExecutor): self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 - def bundle_prefix(self, bundle: BundleDetails) -> str: - colorz = [ - fg('violet red'), - fg('red'), - fg('orange'), - fg('peach orange'), - fg('yellow'), - fg('marigold yellow'), - fg('green yellow'), - fg('tea green'), - fg('cornflower blue'), - fg('turquoise blue'), - fg('tropical blue'), - fg('lavender purple'), - fg('medium purple'), - ] - c = colorz[int(bundle.uuid[-2:], 16) % len(colorz)] - fname = bundle.fname if bundle.fname is not None else 'nofname' - machine = bundle.machine if bundle.machine is not None else 'nomachine' - return f'{c}{bundle.uuid[-8:]}/{fname}/{machine}{reset()}' - def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -599,7 +606,7 @@ class RemoteExecutor(BaseExecutor): bundle.too_slow and len(bundle.backup_bundles) == 0 # one backup per ): - msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" + msg = f"*** Rescheduling {bundle} (first backup) ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return @@ -608,7 +615,7 @@ class RemoteExecutor(BaseExecutor): and len(bundle.backup_bundles) < 2 # two backups in dire situations and self.status.total_idle() > 4 ): - msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" + msg = f"*** Rescheduling {bundle} (second backup) ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return @@ -640,9 +647,9 @@ class RemoteExecutor(BaseExecutor): bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username - fname = bundle.fname + self.status.record_acquire_worker(worker, uuid) - logger.debug(f'{self.bundle_prefix(bundle)}: Running bundle on {worker}...') + logger.debug(f'{bundle}: Running bundle on {worker}...') # Before we do any work, make sure the bundle is still viable. if self.check_if_cancelled(bundle): @@ -651,7 +658,7 @@ class RemoteExecutor(BaseExecutor): except Exception as e: logger.exception(e) logger.error( - f'{self.bundle_prefix(bundle)}: bundle says it\'s cancelled upfront but no results?!' + f'{bundle}: bundle says it\'s cancelled upfront but no results?!' ) assert bundle.worker is not None self.status.record_release_worker( @@ -683,14 +690,14 @@ class RemoteExecutor(BaseExecutor): try: cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() - logger.info(f"{self.bundle_prefix(bundle)}: Copying work to {worker} via {cmd}.") + logger.info(f"{bundle}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts - logger.info(f"{self.bundle_prefix(bundle)}: Copying done to {worker} in {xfer_latency:.1f}s.") + logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: logger.exception(e) logger.error( - f'{self.bundle_prefix(bundle)}: failed to send instructions to worker machine?!?' + f'{bundle}: failed to send instructions to worker machine?!?' ) assert bundle.worker is not None self.status.record_release_worker( @@ -718,16 +725,14 @@ class RemoteExecutor(BaseExecutor): f'"source py39-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') - logger.debug(f'{self.bundle_prefix(bundle)}: Executing {cmd} in the background to kick off work...') + logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid - logger.debug(f'{self.bundle_prefix(bundle)}: Local ssh process pid={pid}; remote worker is {machine}.') + logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.') return self.wait_for_process(p, bundle, 0) def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any: - uuid = bundle.uuid machine = bundle.machine - fname = bundle.fname pid = p.pid if depth > 3: logger.error( @@ -753,12 +758,12 @@ class RemoteExecutor(BaseExecutor): self.heartbeat() if self.check_if_cancelled(bundle): logger.info( - f'{self.bundle_prefix(bundle)}: another worker finished bundle, checking it out...' + f'{bundle}: another worker finished bundle, checking it out...' ) break else: logger.info( - f"{self.bundle_prefix(bundle)}: pid {pid} ({machine}) our ssh finished, checking it out..." + f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..." ) p = None break @@ -780,10 +785,10 @@ class RemoteExecutor(BaseExecutor): # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) - logger.error(f'{self.bundle_prefix(bundle)}: Something unexpected just happened...') + logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: logger.warning( - f"{self.bundle_prefix(bundle)}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." ) return self.wait_for_process(p, bundle, depth + 1) else: @@ -804,8 +809,6 @@ class RemoteExecutor(BaseExecutor): machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file - fname = bundle.fname - uuid = bundle.uuid # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a @@ -816,7 +819,7 @@ class RemoteExecutor(BaseExecutor): if bundle.hostname not in bundle.machine: cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( - f"{self.bundle_prefix(bundle)}: Fetching results from {username}@{machine} via {cmd}" + f"{bundle}: Fetching results from {username}@{machine} via {cmd}" ) # If either of these throw they are handled in @@ -834,7 +837,7 @@ class RemoteExecutor(BaseExecutor): # if one of the backups finished first; it still must read the # result from disk. if is_original: - logger.debug(f"{self.bundle_prefix(bundle)}: Unpickling {result_file}.") + logger.debug(f"{bundle}: Unpickling {result_file}.") try: with open(f'{result_file}', 'rb') as rb: serialized = rb.read() @@ -866,7 +869,7 @@ class RemoteExecutor(BaseExecutor): if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( - f'{self.bundle_prefix(bundle)}: Notifying backup {backup.uuid} that it\'s cancelled' + f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled' ) backup.is_cancelled.set() @@ -881,7 +884,7 @@ class RemoteExecutor(BaseExecutor): # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( - f'{self.bundle_prefix(bundle)}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' + f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' ) bundle.src_bundle.is_cancelled.set() @@ -927,7 +930,7 @@ class RemoteExecutor(BaseExecutor): failure_count = 0, ) self.status.record_bundle_details(bundle) - logger.debug(f'{self.bundle_prefix(bundle)}: Created an original bundle') + logger.debug(f'{bundle}: Created an original bundle') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): @@ -958,7 +961,7 @@ class RemoteExecutor(BaseExecutor): ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) - logger.debug(f'{self.bundle_prefix(bundle)}: Created a backup bundle') + logger.debug(f'{backup_bundle}: Created a backup bundle') return backup_bundle def schedule_backup_for_bundle(self, @@ -975,7 +978,6 @@ class RemoteExecutor(BaseExecutor): # the original pick them up and unpickle them. def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future: - uuid = bundle.uuid is_original = bundle.src_bundle is None bundle.worker = None avoid_last_machine = bundle.machine @@ -989,18 +991,18 @@ class RemoteExecutor(BaseExecutor): if bundle.failure_count > retry_limit: logger.error( - f'{self.bundle_prefix(bundle)}: Tried this bundle too many times already ({retry_limit}x); giving up.' + f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: raise RemoteExecutorException( - f'{self.bundle_prefix(bundle)}: This bundle can\'t be completed despite several backups and retries' + f'{bundle}: This bundle can\'t be completed despite several backups and retries' ) else: - logger.error(f'{self.bundle_prefix(bundle)}: At least it\'s only a backup; better luck with the others.') + logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.') return None else: logger.warning( - f'>>> Emergency rescheduling {self.bundle_prefix(bundle)} because of unexected errors (wtf?!) <<<' + f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' ) return self.launch(bundle, avoid_last_machine) @@ -1059,7 +1061,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', - weight = 12, + weight = 14, count = 4, ), ) -- 2.45.2 From 6f688ff9bacee93679f6af45a301b4308e19764c Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Wed, 17 Nov 2021 22:14:37 -0800 Subject: [PATCH 05/16] WaitableState thingy and some stuff in the ML world. --- ml/model_trainer.py | 13 +++++++------ state_tracker.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/ml/model_trainer.py b/ml/model_trainer.py index f9e132e..f61b8e7 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -356,6 +356,13 @@ class TrainingBlueprint(ABC): import input_utils import string_utils + now: datetime.datetime = datetime_utils.now_pacific() + info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} +Model params: {params} +Training examples: {num_examples} +Training set score: {training_score:.2f}% +Testing set score: {test_score:.2f}%""" + print(f'\n{info}\n') if ( (self.spec.persist_percentage_threshold is not None and test_score > self.spec.persist_percentage_threshold) @@ -376,12 +383,6 @@ class TrainingBlueprint(ABC): print(msg) logger.info(msg) model_info_filename = f"{self.spec.basename}_model_info.txt" - now: datetime.datetime = datetime_utils.now_pacific() - info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} -Model params: {params} -Training examples: {num_examples} -Training set score: {training_score:.2f}% -Testing set score: {test_score:.2f}%""" with open(model_info_filename, "w") as f: f.write(info) msg = f"Wrote {model_info_filename}:" diff --git a/state_tracker.py b/state_tracker.py index 11ce4c3..4f77ff4 100644 --- a/state_tracker.py +++ b/state_tracker.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod import datetime import logging import time +import threading from typing import Dict, Optional import pytz @@ -112,7 +113,7 @@ class AutomaticStateTracker(StateTracker): logger.debug('pace_maker noticed event; shutting down') return self.heartbeat() - logger.debug(f'page_maker is sleeping for {self.sleep_delay}s') + logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s') time.sleep(self.sleep_delay) def __init__( @@ -144,3 +145,32 @@ class AutomaticStateTracker(StateTracker): self.should_terminate.set() self.updater_thread.join() logger.debug('Background thread terminated.') + + +class WaitableAutomaticStateTracker(AutomaticStateTracker): + + def __init__( + self, + update_ids_to_update_secs: Dict[str, float], + *, + override_sleep_delay: Optional[float] = None, + ) -> None: + self._something_changed = threading.Event() + super().__init__(update_ids_to_update_secs, + override_sleep_delay=override_sleep_delay) + + def something_changed(self): + self._something_changed.set() + + def did_something_change(self) -> bool: + return self._something_changed.is_set() + + def reset(self): + self._something_changed.clear() + + def wait(self, + *, + timeout=None): + return self._something_changed.wait( + timeout=timeout + ) -- 2.45.2 From ed8fa2b10b0177b15b7423263bdd390efde2f0c8 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Thu, 18 Nov 2021 20:39:12 -0800 Subject: [PATCH 06/16] Make smart futures avoid polling. Much fucking around with the backup strategy in executors. Tweaks to presence. --- executors.py | 163 ++++++++++++++++++++++++++++++------------------ presence.py | 33 +++++----- smart_future.py | 49 +++++++-------- 3 files changed, 143 insertions(+), 102 deletions(-) diff --git a/executors.py b/executors.py index b446822..c11bd54 100644 --- a/executors.py +++ b/executors.py @@ -22,8 +22,8 @@ from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils import config -from exec_utils import run_silently, cmd_in_background, cmd_with_timeout from decorator_utils import singleton +from exec_utils import run_silently, cmd_in_background, cmd_with_timeout import histogram as hist logger = logging.getLogger(__name__) @@ -230,8 +230,8 @@ class BundleDetails: pid: int start_ts: float end_ts: float - too_slow: bool - super_slow: bool + slower_than_local_p95: bool + slower_than_global_p95: bool src_bundle: BundleDetails is_cancelled: threading.Event was_cancelled: bool @@ -419,21 +419,19 @@ class RemoteExecutorStatus: if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' - elif sec > qworker[0]: - ret += f'{fg("red")}>💻p50{reset()} ' - if qall is not None: - if sec > qall[1] * 1.5: - ret += f'{bg("red")}!!!{reset()}' if details is not None: - logger.debug(f'Flagging {details} for another backup') - details.super_slow = True - elif sec > qall[1]: + details.slower_than_local_p95 = True + else: + if details is not None: + details.slower_than_local_p95 = False + + if qall is not None: + if sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: - logger.debug(f'Flagging {details} for a backup') - details.too_slow = True - elif sec > qall[0]: - ret += f'{fg("red")}>∀p50{reset()}' + details.slower_than_global_p95 = True + else: + details.slower_than_global_p95 = False ret += '\n' return ret @@ -451,7 +449,6 @@ class RemoteExecutorStatus: class RemoteWorkerSelectionPolicy(ABC): def register_worker_pool(self, workers): - random.seed() self.workers = workers @abstractmethod @@ -467,12 +464,14 @@ class RemoteWorkerSelectionPolicy(ABC): class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): + @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False + @overrides def acquire_worker( self, machine_to_avoid = None @@ -499,12 +498,14 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def __init__(self) -> None: self.index = 0 + @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False + @overrides def acquire_worker( self, machine_to_avoid: str = None @@ -551,6 +552,8 @@ class RemoteExecutor(BaseExecutor): ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 + self.backup_lock = threading.Lock() + self.last_backup = None def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -588,37 +591,84 @@ class RemoteExecutor(BaseExecutor): # Look for bundles to reschedule. num_done = len(self.status.finished_bundle_timings) - if num_done > 7 or (num_done > 5 and self.is_worker_available()): - for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): - for uuid in bundle_uuids: - bundle = self.status.bundle_details_by_uuid.get(uuid, None) - if ( - bundle is not None and - bundle.too_slow and - bundle.src_bundle is None and - config.config['executors_schedule_remote_backups'] - ): - self.consider_backup_for_bundle(bundle) - - def consider_backup_for_bundle(self, bundle: BundleDetails) -> None: - assert self.status.lock.locked() - if ( - bundle.too_slow - and len(bundle.backup_bundles) == 0 # one backup per - ): - msg = f"*** Rescheduling {bundle} (first backup) ***" - logger.debug(msg) - self.schedule_backup_for_bundle(bundle) - return - elif ( - bundle.super_slow - and len(bundle.backup_bundles) < 2 # two backups in dire situations - and self.status.total_idle() > 4 - ): - msg = f"*** Rescheduling {bundle} (second backup) ***" - logger.debug(msg) - self.schedule_backup_for_bundle(bundle) - return + num_idle_workers = self.worker_count - self.task_count + now = time.time() + if ( + config.config['executors_schedule_remote_backups'] + and num_done > 2 + and num_idle_workers > 1 + and (self.last_backup is None or (now - self.last_backup > 1.0)) + and self.backup_lock.acquire(blocking=False) + ): + try: + assert self.backup_lock.locked() + + bundle_to_backup = None + best_score = None + for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): + # Prefer to schedule backups of bundles on slower machines. + base_score = 0 + for record in self.workers: + if worker.machine == record.machine: + base_score = float(record.weight) + base_score = 1.0 / base_score + base_score *= 200.0 + base_score = int(base_score) + break + + for uuid in bundle_uuids: + bundle = self.status.bundle_details_by_uuid.get(uuid, None) + if ( + bundle is not None + and bundle.src_bundle is None + and bundle.backup_bundles is not None + ): + score = base_score + + # Schedule backups of bundles running longer; especially those + # that are unexpectedly slow. + start_ts = self.status.start_per_bundle[uuid] + if start_ts is not None: + runtime = now - start_ts + score += runtime + logger.debug(f'score[{bundle}] => {score} # latency boost') + + if bundle.slower_than_local_p95: + score += runtime / 2 + logger.debug(f'score[{bundle}] => {score} # >worker p95') + + if bundle.slower_than_global_p95: + score += runtime / 2 + logger.debug(f'score[{bundle}] => {score} # >global p95') + + # Prefer backups of bundles that don't have backups already. + backup_count = len(bundle.backup_bundles) + if backup_count == 0: + score *= 2 + elif backup_count == 1: + score /= 2 + elif backup_count == 2: + score /= 8 + else: + score = 0 + logger.debug(f'score[{bundle}] => {score} # {backup_count} dup backup factor') + + if ( + score != 0 + and (best_score is None or score > best_score) + ): + bundle_to_backup = bundle + assert bundle is not None + assert bundle.backup_bundles is not None + assert bundle.src_bundle is None + best_score = score + + if bundle_to_backup is not None: + self.last_backup = now + logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====') + self.schedule_backup_for_bundle(bundle_to_backup) + finally: + self.backup_lock.release() def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: @@ -921,8 +971,8 @@ class RemoteExecutor(BaseExecutor): pid = 0, start_ts = time.time(), end_ts = 0.0, - too_slow = False, - super_slow = False, + slower_than_local_p95 = False, + slower_than_global_p95 = False, src_bundle = None, is_cancelled = threading.Event(), was_cancelled = False, @@ -951,8 +1001,8 @@ class RemoteExecutor(BaseExecutor): pid = 0, start_ts = time.time(), end_ts = 0.0, - too_slow = False, - super_slow = False, + slower_than_local_p95 = False, + slower_than_global_p95 = False, src_bundle = src_bundle, is_cancelled = threading.Event(), was_cancelled = False, @@ -967,6 +1017,7 @@ class RemoteExecutor(BaseExecutor): def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() + assert src_bundle is not None backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...' @@ -1095,16 +1146,6 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('backup.house'): - logger.info('Found backup.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'backup.house', - weight = 1, - count = 4, - ), - ) if self.ping('kiosk.house'): logger.info('Found kiosk.house') pool.append( diff --git a/presence.py b/presence.py index b310183..d7db416 100755 --- a/presence.py +++ b/presence.py @@ -51,16 +51,14 @@ class PresenceDetection(object): Person.SCOTT: [ "3C:28:6D:10:6D:41", # pixel3 "6C:40:08:AE:DC:2E", # laptop -# "D4:61:2E:88:18:09", # watch -# "14:7D:DA:6A:20:D7", # work laptop ], Person.LYNN: [ - "08:CC:27:63:26:14", - "B8:31:B5:9A:4F:19", + "08:CC:27:63:26:14", # motog7 + "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ - "0C:CB:85:0C:8B:AE", - "D0:C6:37:E3:36:9A", + "0C:CB:85:0C:8B:AE", # phone + "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", @@ -149,6 +147,7 @@ class PresenceDetection(object): def where_is_person_now(self, name: Person) -> Location: import dict_utils + logger.debug(f'Looking for {name}...') if name is Person.UNKNOWN: if self.weird_mac_at_cabin: @@ -159,26 +158,30 @@ class PresenceDetection(object): tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 for mac in self.devices_by_person[name]: - logger.debug(f'Looking for {name}... check for mac {mac}') if mac not in self.names_by_mac: continue + mac_name = self.names_by_mac[mac] + logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] - logger.debug(f'I saw {mac} at {location} at {ts}') + logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') tiebreaks[location] = ts - location = dict_utils.key_with_min_value(tiebreaks) - v = votes.get(location, 0) - votes[location] = v + credit - logger.debug(f'{name}: {location} gets {credit} votes.') + + (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) + bonus = credit + v = votes.get(most_recent_location, 0) + votes[most_recent_location] = v + bonus + logger.debug(f'{name}: {location} gets {bonus} votes.') credit = int( - credit * 0.667 + credit * 0.2 ) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: - item = dict_utils.item_with_max_value(votes) - return item[0] + (location, value) = dict_utils.item_with_max_value(votes) + if value > 2001: + return location return Location.UNKNOWN diff --git a/smart_future.py b/smart_future.py index f11be17..c097d53 100644 --- a/smart_future.py +++ b/smart_future.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 from __future__ import annotations -from collections.abc import Mapping +import concurrent import concurrent.futures as fut -import time from typing import Callable, List, TypeVar from overrides import overrides @@ -17,35 +16,33 @@ T = TypeVar('T') def wait_any(futures: List[SmartFuture], *, callback: Callable = None): - finished: Mapping[int, bool] = {} - x = 0 - while True: - future = futures[x] - if not finished.get(future.get_id(), False): - if future.is_ready(): - finished[future.get_id()] = True - yield future - else: - if callback is not None: - callback() - time.sleep(0.1) - x += 1 - if x >= len(futures): - x = 0 - if len(finished) == len(futures): + real_futures = [] + smart_future_by_real_future = {} + completed_futures = set() + for _ in futures: + real_futures.append(_.wrapped_future) + smart_future_by_real_future[_.wrapped_future] = _ + while len(completed_futures) != len(real_futures): + newly_completed_futures = concurrent.futures.as_completed(real_futures) + for f in newly_completed_futures: if callback is not None: callback() - return + completed_futures.add(f) + yield smart_future_by_real_future[f] + if callback is not None: + callback() + return def wait_all(futures: List[SmartFuture]) -> None: - done_set = set() - while len(done_set) < len(futures): - for future in futures: - i = future.get_id() - if i not in done_set and future.wrapped_future.done(): - done_set.add(i) - time.sleep(0.1) + real_futures = [x.wrapped_future for x in futures] + (done, not_done) = concurrent.futures.wait( + real_futures, + timeout=None, + return_when=concurrent.futures.ALL_COMPLETED + ) + assert len(done) == len(real_futures) + assert len(not_done) == 0 class SmartFuture(DeferredOperand): -- 2.45.2 From b29be4f1750fd20bd2eada88e751dfae85817882 Mon Sep 17 00:00:00 2001 From: Scott Date: Sat, 4 Dec 2021 21:23:11 -0800 Subject: [PATCH 07/16] Various changes. --- arper.py | 43 ++++++++++++++++++++++++++++++++++--------- exec_utils.py | 14 ++++++++++++++ executors.py | 30 +++++++++++++++++++++++++----- input_utils.py | 8 ++++++++ presence.py | 19 ++++++++++++++----- remote_worker.py | 3 ++- smart_home/cameras.py | 4 ++-- smart_home/lights.py | 1 + string_utils.py | 21 ++++++++++++++++++++- text_utils.py | 15 +++++++++++++-- 10 files changed, 133 insertions(+), 25 deletions(-) diff --git a/arper.py b/arper.py index 4d6a3a2..2171e77 100644 --- a/arper.py +++ b/arper.py @@ -33,7 +33,7 @@ cfg.add_argument( cfg.add_argument( '--arper_cache_max_staleness', type=argparse_utils.valid_duration, - default=datetime.timedelta(seconds=60 * 60), + default=datetime.timedelta(seconds=60 * 15), metavar='DURATION', help='Max acceptable age of the kernel arp table cache' ) @@ -56,19 +56,44 @@ class Arper(persistent.Persistent): self.state = cached_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') - self.update() + self.update_from_arp_scan() + self.update_from_arp() + if len(self.state) < config.config['arper_min_entries_to_be_valid']: + raise Exception('Arper didn\'t find enough entries; only got {len(self.state)}.') - def update(self): - output = exec_utils.cmd( - '/usr/sbin/arp -a', - timeout_seconds=5.0 - ) + def update_from_arp_scan(self): + network_spec = site_config.get_config().network + try: + output = exec_utils.cmd( + f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', + timeout_seconds=10.0 + ) + except Exception as e: + logger.exception(e) + return + for line in output.split('\n'): + ip = string_utils.extract_ip_v4(line) + mac = string_utils.extract_mac_address(line) + if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': + mac = mac.lower() + logger.debug(f'ARPER: {mac} => {ip}') + self.state[mac] = ip + + def update_from_arp(self): + try: + output = exec_utils.cmd( + '/usr/sbin/arp -a', + timeout_seconds=10.0 + ) + except Exception as e: + logger.exception(e) + return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None: + if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() - logger.debug(f' {mac} => {ip}') + logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: diff --git a/exec_utils.py b/exec_utils.py index 89cfbd7..b52f52f 100644 --- a/exec_utils.py +++ b/exec_utils.py @@ -10,6 +10,20 @@ from typing import List, Optional logger = logging.getLogger(__file__) +def cmd_showing_output(command: str) -> None: + p = subprocess.Popen( + command, + shell=True, + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + for line in iter(p.stdout.readline, b''): + print(line.decode('utf-8'), end='') + p.stdout.close() + p.wait() + + def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int: """ Run a command but do not let it run for more than timeout seconds. diff --git a/executors.py b/executors.py index c11bd54..92c5b34 100644 --- a/executors.py +++ b/executors.py @@ -1113,7 +1113,7 @@ class DefaultExecutors(object): username = 'scott', machine = 'cheetah.house', weight = 14, - count = 4, + count = 6, ), ) if self.ping('video.house'): @@ -1126,12 +1126,12 @@ class DefaultExecutors(object): count = 4, ), ) - if self.ping('wannabe.house'): - logger.info('Found wannabe.house') + if self.ping('gorilla.house'): + logger.info('Found gorilla.house') pool.append( RemoteWorkerRecord( username = 'scott', - machine = 'wannabe.house', + machine = 'gorilla.house', weight = 2, count = 4, ), @@ -1156,6 +1156,16 @@ class DefaultExecutors(object): count = 2, ), ) + if self.ping('hero.house'): + logger.info('Found hero.house') + pool.append( + RemoteWorkerRecord( + username = 'scott', + machine = 'hero.house', + weight = 30, + count = 10, + ), + ) if self.ping('puma.cabin'): logger.info('Found puma.cabin') pool.append( @@ -1163,7 +1173,17 @@ class DefaultExecutors(object): username = 'scott', machine = 'puma.cabin', weight = 12, - count = 4, + count = 6, + ), + ) + if self.ping('puma.house'): + logger.info('Found puma.house') + pool.append( + RemoteWorkerRecord( + username = 'scott', + machine = 'puma.house', + weight = 12, + count = 6, ), ) diff --git a/input_utils.py b/input_utils.py index a989b2d..e0b457d 100644 --- a/input_utils.py +++ b/input_utils.py @@ -2,6 +2,7 @@ """Utilities related to user input.""" +import logging import signal import sys from typing import List @@ -11,6 +12,9 @@ import readchar # type: ignore import exceptions +logger = logging.getLogger(__file__) + + def single_keystroke_response( valid_responses: List[str], *, @@ -34,6 +38,7 @@ def single_keystroke_response( try: while True: response = readchar.readchar() + logger.debug(f'Keystroke: {ord(response)}') if response in valid_responses: break if ord(response) in os_special_keystrokes: @@ -50,6 +55,9 @@ def single_keystroke_response( response = _single_keystroke_response_internal( valid_responses, timeout_seconds ) + if ord(response) == 3: + raise KeyboardInterrupt('User pressed ^C in input_utils.') + except exceptions.TimeoutError: if default_response is not None: response = default_response diff --git a/presence.py b/presence.py index d7db416..5fad457 100755 --- a/presence.py +++ b/presence.py @@ -5,6 +5,7 @@ from collections import defaultdict import enum import logging import re +import sys from typing import Dict, List # Note: this module is fairly early loaded. Be aware of dependencies. @@ -81,16 +82,24 @@ class PresenceDetection(object): self.update() def update(self) -> None: - from exec_utils import cmd + from exec_utils import cmd_with_timeout try: persisted_macs = config.config['presence_macs_file'] except KeyError: persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' self.read_persisted_macs_file(persisted_macs, Location.HOUSE) - raw = cmd( - "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'" - ) - self.parse_raw_macs_file(raw, Location.CABIN) + try: + raw = cmd_with_timeout( + "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", + timeout_seconds=10.0, + ) + self.parse_raw_macs_file(raw, Location.CABIN) + except Exception as e: + logger.exception(e) + logger.error( + 'Unable to fetch MAC Addresses from meerkat; can\'t do proper presence detection.' + ) + sys.exit(1) def read_persisted_macs_file( self, filename: str, location: Location diff --git a/remote_worker.py b/remote_worker.py index bf8de6c..c04ac65 100755 --- a/remote_worker.py +++ b/remote_worker.py @@ -83,7 +83,8 @@ def main() -> None: in_file = config.config['code_file'] out_file = config.config['result_file'] - (thread, stop_thread) = watch_for_cancel() + if config.config['watch_for_cancel']: + (thread, stop_thread) = watch_for_cancel() logger.debug(f'Reading {in_file}.') try: diff --git a/smart_home/cameras.py b/smart_home/cameras.py index 40850a9..8137012 100644 --- a/smart_home/cameras.py +++ b/smart_home/cameras.py @@ -15,7 +15,7 @@ class BaseCamera(dev.Device): 'outside_backyard_camera': 'backyard', 'outside_driveway_camera': 'driveway', 'outside_doorbell_camera': 'doorbell', - 'outside_front_door_camera': 'frontdoor', + 'outside_front_door_camera': 'front_door', } def __init__(self, name: str, mac: str, keywords: str = "") -> None: @@ -24,4 +24,4 @@ class BaseCamera(dev.Device): def get_stream_url(self) -> str: assert self.camera_name is not None - return f'http://10.0.0.56:81/mjpg/{self.camera_name}/video.mjpg?h=1024&q=99' + return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mp4/GKlT2FfiSQ/{self.camera_name}/s.mp4' diff --git a/smart_home/lights.py b/smart_home/lights.py index dd211eb..1c4081c 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -334,6 +334,7 @@ class TPLinkLight(BaseLight): def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) + logger.debug(f'RAW OUT> {out}') out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: diff --git a/string_utils.py b/string_utils.py index 9a38d25..aca4a5e 100644 --- a/string_utils.py +++ b/string_utils.py @@ -1503,12 +1503,16 @@ def from_bitstring(bits: str, encoding='utf-8', errors='surrogatepass') -> str: return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0' -def ip_v4_sort_key(txt: str) -> str: +def ip_v4_sort_key(txt: str) -> Tuple[int]: """Turn an IPv4 address into a tuple for sorting purposes. >>> ip_v4_sort_key('10.0.0.18') (10, 0, 0, 18) + >>> ips = ['10.0.0.10', '100.0.0.1', '1.2.3.4', '10.0.0.9'] + >>> sorted(ips, key=lambda x: ip_v4_sort_key(x)) + ['1.2.3.4', '10.0.0.9', '10.0.0.10', '100.0.0.1'] + """ if not is_ip_v4(txt): print(f"not IP: {txt}") @@ -1516,6 +1520,21 @@ def ip_v4_sort_key(txt: str) -> str: return tuple([int(x) for x in txt.split('.')]) +def path_ancestors_before_descendants_sort_key(volume: str) -> Tuple[str]: + """Chunk up a file path so that parent/ancestor paths sort before + children/descendant paths. + + >>> path_ancestors_before_descendants_sort_key('/usr/local/bin') + ('usr', 'local', 'bin') + + >>> paths = ['/usr/local', '/usr/local/bin', '/usr'] + >>> sorted(paths, key=lambda x: path_ancestors_before_descendants_sort_key(x)) + ['/usr', '/usr/local', '/usr/local/bin'] + + """ + return tuple([x for x in volume.split('/') if len(x) > 0]) + + if __name__ == '__main__': import doctest doctest.testmod() diff --git a/text_utils.py b/text_utils.py index 36cfe2f..9a9eb54 100644 --- a/text_utils.py +++ b/text_utils.py @@ -3,6 +3,7 @@ """Utilities for dealing with "text".""" from collections import defaultdict +import logging import math import sys from typing import List, NamedTuple, Optional @@ -10,6 +11,9 @@ from typing import List, NamedTuple, Optional from ansi import fg, reset +logger = logging.getLogger(__file__) + + class RowsColumns(NamedTuple): rows: int columns: int @@ -18,8 +22,15 @@ class RowsColumns(NamedTuple): def get_console_rows_columns() -> RowsColumns: """Returns the number of rows/columns on the current console.""" - from exec_utils import cmd - rows, columns = cmd("stty size").split() + from exec_utils import cmd_with_timeout + try: + rows, columns = cmd_with_timeout( + "stty size", + timeout_seconds=5.0, + ).split() + except Exception as e: + logger.exception(e) + raise Exception('Can\'t determine console size?!') return RowsColumns(int(rows), int(columns)) -- 2.45.2 From c63d439fb7e1d6f50e849b580f8bc6cfe88a81b6 Mon Sep 17 00:00:00 2001 From: Scott Date: Sun, 5 Dec 2021 20:50:59 -0800 Subject: [PATCH 08/16] Make presence detection work from cabin or house and deal with a bad VPN link between them better. --- locations.py | 9 +++++++ people.py | 14 +++++++++++ presence.py | 65 ++++++++++++++++++++++++++++++++------------------ site_config.py | 44 ++++++++++++++++++++++++---------- 4 files changed, 96 insertions(+), 36 deletions(-) create mode 100644 locations.py create mode 100644 people.py diff --git a/locations.py b/locations.py new file mode 100644 index 0000000..744f63a --- /dev/null +++ b/locations.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 + +import enum + +@enum.unique +class Location(enum.Enum): + UNKNOWN = 0 + HOUSE = 1 + CABIN = 2 diff --git a/people.py b/people.py new file mode 100644 index 0000000..1dc0421 --- /dev/null +++ b/people.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +import enum + + +class Person(enum.Enum): + UNKNOWN = 0 + SCOTT = 1 + LYNN = 2 + ALEX = 3 + AARON_AND_DANA = 4 + AARON = 4 + DANA = 4 + diff --git a/presence.py b/presence.py index 5fad457..477b14f 100755 --- a/presence.py +++ b/presence.py @@ -2,16 +2,18 @@ import datetime from collections import defaultdict -import enum import logging import re -import sys -from typing import Dict, List +from typing import Dict, List, Set # Note: this module is fairly early loaded. Be aware of dependencies. import argparse_utils import bootstrap import config +from locations import Location +from people import Person +import site_config + logger = logging.getLogger(__name__) @@ -28,23 +30,6 @@ cfg.add_argument( ) -class Person(enum.Enum): - UNKNOWN = 0 - SCOTT = 1 - LYNN = 2 - ALEX = 3 - AARON_AND_DANA = 4 - AARON = 4 - DANA = 4 - - -@enum.unique -class Location(enum.Enum): - UNKNOWN = 0 - HOUSE = 1 - CABIN = 2 - - class PresenceDetection(object): def __init__(self) -> None: # Note: list most important devices first. @@ -74,14 +59,26 @@ class PresenceDetection(object): "96:69:2C:88:7A:C3", ], } + self.run_location = site_config.get_location() + logger.debug(f"run_location is {self.run_location}") self.weird_mac_at_cabin = False self.location_ts_by_mac: Dict[ Location, Dict[str, datetime.datetime] ] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} + self.dark_locations: Set[Location] = set() self.update() def update(self) -> None: + self.dark_locations = set() + if self.run_location is Location.HOUSE: + self.update_from_house() + elif self.run_location is Location.CABIN: + self.update_from_cabin() + else: + raise Exception("Where the hell is this running?!") + + def update_from_house(self) -> None: from exec_utils import cmd_with_timeout try: persisted_macs = config.config['presence_macs_file'] @@ -96,10 +93,26 @@ class PresenceDetection(object): self.parse_raw_macs_file(raw, Location.CABIN) except Exception as e: logger.exception(e) - logger.error( - 'Unable to fetch MAC Addresses from meerkat; can\'t do proper presence detection.' + logger.warning("Can't see the cabin right now; presence detection impared.") + self.dark_locations.add(Location.CABIN) + + def update_from_cabin(self) -> None: + from exec_utils import cmd_with_timeout + try: + persisted_macs = config.config['presence_macs_file'] + except KeyError: + persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' + self.read_persisted_macs_file(persisted_macs, Location.CABIN) + try: + raw = cmd_with_timeout( + "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", + timeout_seconds=10.0, ) - sys.exit(1) + self.parse_raw_macs_file(raw, Location.HOUSE) + except Exception as e: + logger.exception(e) + logger.warning(f"Can't see the house right now; presence detection impared.") + self.dark_locations.add(Location.HOUSE) def read_persisted_macs_file( self, filename: str, location: Location @@ -145,6 +158,8 @@ class PresenceDetection(object): self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: + if location in self.dark_locations: + raise Exception("Can't see {location} right now; answer undefined.") for person in Person: if person is not None: loc = self.where_is_person_now(person) @@ -156,6 +171,10 @@ class PresenceDetection(object): def where_is_person_now(self, name: Person) -> Location: import dict_utils + if len(self.dark_locations) > 0: + logger.warning( + f"Can't see {self.dark_locations} right now; answer confidence impacted" + ) logger.debug(f'Looking for {name}...') if name is Person.UNKNOWN: diff --git a/site_config.py b/site_config.py index e3b186d..2d0c4c3 100644 --- a/site_config.py +++ b/site_config.py @@ -7,9 +7,10 @@ from typing import Callable # Note: this module is fairly early loaded. Be aware of dependencies. import config -import presence +from locations import Location logger = logging.getLogger(__name__) + args = config.add_commandline_args( f'({__file__})', 'Args related to __file__' @@ -26,28 +27,43 @@ args.add_argument( @dataclass class SiteConfig(object): - location: str + location_name: str + location: Location network: str network_netmask: str network_router_ip: str - presence_location: presence.Location + presence_location: Location is_anyone_present: Callable[None, bool] arper_minimum_device_count: int -def get_location(): +def get_location_name(): """ Where are we? - >>> location = get_location() + >>> location = get_location_name() >>> location == 'HOUSE' or location == 'CABIN' True + """ + return get_config().location_name + + +def get_location(): + """ + Returns location as an enum instead of a string. + + >>> from locations import Location + >>> location = get_location() + >>> location == Location.HOUSE or location == Location.CABIN + True + """ return get_config().location -def is_anyone_present_wrapper(location: presence.Location): +def is_anyone_present_wrapper(location: Location): + import presence p = presence.PresenceDetection() return p.is_anyone_in_location_now(location) @@ -58,7 +74,7 @@ def get_config(): site-specific including the current running location. >>> cfg = get_config() - >>> cfg.location == 'HOUSE' or cfg.location == 'CABIN' + >>> cfg.location_name == 'HOUSE' or cfg.location_name == 'CABIN' True """ @@ -74,22 +90,24 @@ def get_config(): location = 'CABIN' if location == 'HOUSE': return SiteConfig( - location = 'HOUSE', + location_name = 'HOUSE', + location = Location.HOUSE, network = '10.0.0.0/24', network_netmask = '255.255.255.0', network_router_ip = '10.0.0.1', - presence_location = presence.Location.HOUSE, - is_anyone_present = lambda x=presence.Location.HOUSE: is_anyone_present_wrapper(x), + presence_location = Location.HOUSE, + is_anyone_present = lambda x=Location.HOUSE: is_anyone_present_wrapper(x), arper_minimum_device_count = 50, ) elif location == 'CABIN': return SiteConfig( - location = 'CABIN', + location_name = 'CABIN', + location = Location.CABIN, network = '192.168.0.0/24', network_netmask = '255.255.255.0', network_router_ip = '192.168.0.1', - presence_location = presence.Location.CABIN, - is_anyone_present = lambda x=presence.Location.CABIN: is_anyone_present_wrapper(x), + presence_location = Location.CABIN, + is_anyone_present = lambda x=Location.CABIN: is_anyone_present_wrapper(x), arper_minimum_device_count = 15, ) else: -- 2.45.2 From 4f11c12a1afb209eb1ba52a4632c5f49234cc0dc Mon Sep 17 00:00:00 2001 From: Scott Date: Sun, 5 Dec 2021 21:43:24 -0800 Subject: [PATCH 09/16] Make PresenceDetection self-update periodically. --- presence.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/presence.py b/presence.py index 477b14f..b32a218 100755 --- a/presence.py +++ b/presence.py @@ -28,6 +28,13 @@ cfg.add_argument( metavar="FILENAME", help="The location of persisted_mac_addresses.txt to use." ) +cfg.add_argument( + '--presence_tolerable_staleness_seconds', + type=argparse_utils.valid_duration, + default=datetime.timedelta(seconds=60 * 5), + metavar='DURATION', + help='Max acceptable age of location data before auto-refreshing' +) class PresenceDetection(object): @@ -67,7 +74,19 @@ class PresenceDetection(object): ] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} self.dark_locations: Set[Location] = set() - self.update() + self.last_update = None + + def maybe_update(self) -> None: + if self.last_update is None: + self.update() + else: + now = datetime.datetime.now() + delta = now - self.last_update + if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds(): + logger.debug( + f"It's been {delta.total_seconds()}s since last update; refreshing now." + ) + self.update() def update(self) -> None: self.dark_locations = set() @@ -77,6 +96,7 @@ class PresenceDetection(object): self.update_from_cabin() else: raise Exception("Where the hell is this running?!") + self.last_update = datetime.datetime.now() def update_from_house(self) -> None: from exec_utils import cmd_with_timeout @@ -111,7 +131,7 @@ class PresenceDetection(object): self.parse_raw_macs_file(raw, Location.HOUSE) except Exception as e: logger.exception(e) - logger.warning(f"Can't see the house right now; presence detection impared.") + logger.warning("Can't see the house right now; presence detection impared.") self.dark_locations.add(Location.HOUSE) def read_persisted_macs_file( @@ -158,6 +178,7 @@ class PresenceDetection(object): self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: + self.maybe_update() if location in self.dark_locations: raise Exception("Can't see {location} right now; answer undefined.") for person in Person: @@ -170,7 +191,7 @@ class PresenceDetection(object): return False def where_is_person_now(self, name: Person) -> Location: - import dict_utils + self.maybe_update() if len(self.dark_locations) > 0: logger.warning( f"Can't see {self.dark_locations} right now; answer confidence impacted" @@ -182,6 +203,8 @@ class PresenceDetection(object): return Location.CABIN else: return Location.UNKNOWN + + import dict_utils votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 -- 2.45.2 From ba223f821df1e9b8abbb6f6d23d5ba92c5a70b05 Mon Sep 17 00:00:00 2001 From: Scott Date: Mon, 6 Dec 2021 15:43:17 -0800 Subject: [PATCH 10/16] Move stuff around. --- lockfile.py | 1 - presence.py | 250 ------------------------------ site_config.py | 8 +- string_utils.py | 18 +++ locations.py => type/locations.py | 0 people.py => type/people.py | 0 6 files changed, 22 insertions(+), 255 deletions(-) delete mode 100755 presence.py rename locations.py => type/locations.py (100%) rename people.py => type/people.py (100%) diff --git a/lockfile.py b/lockfile.py index 1e0516b..b6a832e 100644 --- a/lockfile.py +++ b/lockfile.py @@ -37,7 +37,6 @@ class LockFile(object): # some logic for detecting stale locks. """ - def __init__( self, lockfile_path: str, diff --git a/presence.py b/presence.py deleted file mode 100755 index b32a218..0000000 --- a/presence.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python3 - -import datetime -from collections import defaultdict -import logging -import re -from typing import Dict, List, Set - -# Note: this module is fairly early loaded. Be aware of dependencies. -import argparse_utils -import bootstrap -import config -from locations import Location -from people import Person -import site_config - - -logger = logging.getLogger(__name__) - -cfg = config.add_commandline_args( - f"Presence Detection ({__file__})", - "Args related to detection of human beings in locations.", -) -cfg.add_argument( - "--presence_macs_file", - type=argparse_utils.valid_filename, - default = "/home/scott/cron/persisted_mac_addresses.txt", - metavar="FILENAME", - help="The location of persisted_mac_addresses.txt to use." -) -cfg.add_argument( - '--presence_tolerable_staleness_seconds', - type=argparse_utils.valid_duration, - default=datetime.timedelta(seconds=60 * 5), - metavar='DURATION', - help='Max acceptable age of location data before auto-refreshing' -) - - -class PresenceDetection(object): - def __init__(self) -> None: - # Note: list most important devices first. - self.devices_by_person: Dict[Person, List[str]] = { - Person.SCOTT: [ - "3C:28:6D:10:6D:41", # pixel3 - "6C:40:08:AE:DC:2E", # laptop - ], - Person.LYNN: [ - "08:CC:27:63:26:14", # motog7 - "B8:31:B5:9A:4F:19", # laptop - ], - Person.ALEX: [ - "0C:CB:85:0C:8B:AE", # phone - "D0:C6:37:E3:36:9A", # laptop - ], - Person.AARON_AND_DANA: [ - "98:B6:E9:E5:5A:7C", - "D6:2F:37:CA:B2:9B", - "6C:E8:5C:ED:17:26", - "90:E1:7B:13:7C:E5", - "6E:DC:7C:75:02:1B", - "B2:16:1A:93:7D:50", - "18:65:90:DA:3A:35", - "22:28:C8:7D:3C:85", - "B2:95:23:69:91:F8", - "96:69:2C:88:7A:C3", - ], - } - self.run_location = site_config.get_location() - logger.debug(f"run_location is {self.run_location}") - self.weird_mac_at_cabin = False - self.location_ts_by_mac: Dict[ - Location, Dict[str, datetime.datetime] - ] = defaultdict(dict) - self.names_by_mac: Dict[str, str] = {} - self.dark_locations: Set[Location] = set() - self.last_update = None - - def maybe_update(self) -> None: - if self.last_update is None: - self.update() - else: - now = datetime.datetime.now() - delta = now - self.last_update - if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds(): - logger.debug( - f"It's been {delta.total_seconds()}s since last update; refreshing now." - ) - self.update() - - def update(self) -> None: - self.dark_locations = set() - if self.run_location is Location.HOUSE: - self.update_from_house() - elif self.run_location is Location.CABIN: - self.update_from_cabin() - else: - raise Exception("Where the hell is this running?!") - self.last_update = datetime.datetime.now() - - def update_from_house(self) -> None: - from exec_utils import cmd_with_timeout - try: - persisted_macs = config.config['presence_macs_file'] - except KeyError: - persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.HOUSE) - try: - raw = cmd_with_timeout( - "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", - timeout_seconds=10.0, - ) - self.parse_raw_macs_file(raw, Location.CABIN) - except Exception as e: - logger.exception(e) - logger.warning("Can't see the cabin right now; presence detection impared.") - self.dark_locations.add(Location.CABIN) - - def update_from_cabin(self) -> None: - from exec_utils import cmd_with_timeout - try: - persisted_macs = config.config['presence_macs_file'] - except KeyError: - persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.CABIN) - try: - raw = cmd_with_timeout( - "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", - timeout_seconds=10.0, - ) - self.parse_raw_macs_file(raw, Location.HOUSE) - except Exception as e: - logger.exception(e) - logger.warning("Can't see the house right now; presence detection impared.") - self.dark_locations.add(Location.HOUSE) - - def read_persisted_macs_file( - self, filename: str, location: Location - ) -> None: - if location is Location.UNKNOWN: - return - with open(filename, "r") as rf: - lines = rf.read() - self.parse_raw_macs_file(lines, location) - - def parse_raw_macs_file(self, raw: str, location: Location) -> None: - lines = raw.split("\n") - - # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 - cabin_count = 0 - for line in lines: - line = line.strip() - if len(line) == 0: - continue - logger.debug(f'{location}> {line}') - if "cabin_" in line: - continue - if location == Location.CABIN: - logger.debug('Cabin count: {cabin_count}') - cabin_count += 1 - try: - (mac, count, ip_name, mfg, ts) = line.split(",") - except Exception as e: - logger.error(f'SKIPPED BAD LINE> {line}') - logger.exception(e) - continue - mac = mac.strip() - (self.location_ts_by_mac[location])[ - mac - ] = datetime.datetime.fromtimestamp(int(ts.strip())) - ip_name = ip_name.strip() - match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) - if match is not None: - name = match.group(2) - self.names_by_mac[mac] = name - if cabin_count > 0: - logger.debug('Weird MAC at the cabin') - self.weird_mac_at_cabin = True - - def is_anyone_in_location_now(self, location: Location) -> bool: - self.maybe_update() - if location in self.dark_locations: - raise Exception("Can't see {location} right now; answer undefined.") - for person in Person: - if person is not None: - loc = self.where_is_person_now(person) - if location == loc: - return True - if location == location.CABIN and self.weird_mac_at_cabin: - return True - return False - - def where_is_person_now(self, name: Person) -> Location: - self.maybe_update() - if len(self.dark_locations) > 0: - logger.warning( - f"Can't see {self.dark_locations} right now; answer confidence impacted" - ) - logger.debug(f'Looking for {name}...') - - if name is Person.UNKNOWN: - if self.weird_mac_at_cabin: - return Location.CABIN - else: - return Location.UNKNOWN - - import dict_utils - votes: Dict[Location, int] = {} - tiebreaks: Dict[Location, datetime.datetime] = {} - credit = 10000 - for mac in self.devices_by_person[name]: - if mac not in self.names_by_mac: - continue - mac_name = self.names_by_mac[mac] - logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') - for location in self.location_ts_by_mac: - if mac in self.location_ts_by_mac[location]: - ts = (self.location_ts_by_mac[location])[mac] - logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') - tiebreaks[location] = ts - - (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) - bonus = credit - v = votes.get(most_recent_location, 0) - votes[most_recent_location] = v + bonus - logger.debug(f'{name}: {location} gets {bonus} votes.') - credit = int( - credit * 0.2 - ) # Note: list most important devices first - if credit <= 0: - credit = 1 - if len(votes) > 0: - (location, value) = dict_utils.item_with_max_value(votes) - if value > 2001: - return location - return Location.UNKNOWN - - -@bootstrap.initialize -def main() -> None: - p = PresenceDetection() - for person in Person: - print(f'{person} => {p.where_is_person_now(person)}') - print() - for location in Location: - print(f'{location} => {p.is_anyone_in_location_now(location)}') - - -if __name__ == '__main__': - main() diff --git a/site_config.py b/site_config.py index 2d0c4c3..4968523 100644 --- a/site_config.py +++ b/site_config.py @@ -7,7 +7,7 @@ from typing import Callable # Note: this module is fairly early loaded. Be aware of dependencies. import config -from locations import Location +from type.locations import Location logger = logging.getLogger(__name__) @@ -21,7 +21,7 @@ args.add_argument( const='NONE', nargs='?', choices=('HOUSE', 'CABIN', 'NONE'), - help='Where are we, HOUSE, CABIN?', + help='Where are we, HOUSE, CABIN? Overrides standard detection code.', ) @@ -63,8 +63,8 @@ def get_location(): def is_anyone_present_wrapper(location: Location): - import presence - p = presence.PresenceDetection() + import base_presence + p = base_presence.PresenceDetection() return p.is_anyone_in_location_now(location) diff --git a/string_utils.py b/string_utils.py index aca4a5e..097dc1b 100644 --- a/string_utils.py +++ b/string_utils.py @@ -1141,6 +1141,24 @@ def valid_datetime(in_str: str) -> bool: return False +def squeeze(in_str: str, character_to_squeeze: str = ' ') -> str: + """ + Squeeze runs of more than one character_to_squeeze into one. + + >>> squeeze(' this is a test ') + ' this is a test ' + + >>> squeeze('one|!||!|two|!||!|three', character_to_squeeze='|!|') + 'one|!|two|!|three' + + """ + return re.sub( + r'(' + re.escape(character_to_squeeze) + r')+', + character_to_squeeze, + in_str + ) + + def dedent(in_str: str) -> str: """ Removes tab indentation from multi line strings (inspired by analogous Scala function). diff --git a/locations.py b/type/locations.py similarity index 100% rename from locations.py rename to type/locations.py diff --git a/people.py b/type/people.py similarity index 100% rename from people.py rename to type/people.py -- 2.45.2 From 5f75cf834725ac26b289cc5f157af0cb71cd5f0e Mon Sep 17 00:00:00 2001 From: Scott Date: Thu, 6 Jan 2022 12:13:34 -0800 Subject: [PATCH 11/16] A bunch of changes... --- .gitignore | 1 + arper.py | 2 +- base_presence.py | 250 +++++++++++++++++++++++++++++++++++ cached/weather_forecast.py | 22 +-- camera_utils.py | 36 +++-- dateparse/dateparse_utils.py | 14 ++ executors.py | 73 +++++----- list_utils.py | 66 ++++++++- lockfile.py | 27 +++- logging_utils.py | 28 +++- ml/model_trainer.py | 4 +- pip_install.sh | 2 + remote_worker.py | 6 +- site_config.py | 4 +- smart_home/cameras.py | 9 +- smart_home/lights.py | 5 +- smart_home/outlets.py | 87 +++++++++++- smart_home/registry.py | 3 + smart_home/thermometers.py | 50 +++++++ waitable_presence.py | 101 ++++++++++++++ 20 files changed, 698 insertions(+), 92 deletions(-) create mode 100755 base_presence.py create mode 100644 smart_home/thermometers.py create mode 100644 waitable_presence.py diff --git a/.gitignore b/.gitignore index 28e68dd..221b64b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +scott_secrets.py __pycache__/* */__pycache__/* .mypy_cache/* diff --git a/arper.py b/arper.py index 2171e77..8f419f9 100644 --- a/arper.py +++ b/arper.py @@ -48,7 +48,7 @@ cfg.add_argument( @persistent.persistent_autoloaded_singleton() class Arper(persistent.Persistent): def __init__( - self, cached_state: Optional[BiDict[str, str]] = None + self, cached_state: Optional[BiDict] = None ) -> None: self.state = BiDict() if cached_state is not None: diff --git a/base_presence.py b/base_presence.py new file mode 100755 index 0000000..cd62bb7 --- /dev/null +++ b/base_presence.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 + +import datetime +from collections import defaultdict +import logging +import re +from typing import Dict, List, Set + +# Note: this module is fairly early loaded. Be aware of dependencies. +import argparse_utils +import bootstrap +import config +from type.locations import Location +from type.people import Person +import site_config + + +logger = logging.getLogger(__name__) + +cfg = config.add_commandline_args( + f"Presence Detection ({__file__})", + "Args related to detection of human beings in locations.", +) +cfg.add_argument( + "--presence_macs_file", + type=argparse_utils.valid_filename, + default = "/home/scott/cron/persisted_mac_addresses.txt", + metavar="FILENAME", + help="The location of persisted_mac_addresses.txt to use." +) +cfg.add_argument( + '--presence_tolerable_staleness_seconds', + type=argparse_utils.valid_duration, + default=datetime.timedelta(seconds=60 * 5), + metavar='DURATION', + help='Max acceptable age of location data before auto-refreshing' +) + + +class PresenceDetection(object): + def __init__(self) -> None: + # Note: list most important devices first. + self.devices_by_person: Dict[Person, List[str]] = { + Person.SCOTT: [ + "3C:28:6D:10:6D:41", # pixel3 + "6C:40:08:AE:DC:2E", # laptop + ], + Person.LYNN: [ + "08:CC:27:63:26:14", # motog7 + "B8:31:B5:9A:4F:19", # laptop + ], + Person.ALEX: [ + "0C:CB:85:0C:8B:AE", # phone + "D0:C6:37:E3:36:9A", # laptop + ], + Person.AARON_AND_DANA: [ + "98:B6:E9:E5:5A:7C", + "D6:2F:37:CA:B2:9B", + "6C:E8:5C:ED:17:26", + "90:E1:7B:13:7C:E5", + "6E:DC:7C:75:02:1B", + "B2:16:1A:93:7D:50", + "18:65:90:DA:3A:35", + "22:28:C8:7D:3C:85", + "B2:95:23:69:91:F8", + "96:69:2C:88:7A:C3", + ], + } + self.run_location = site_config.get_location() + logger.debug(f"run_location is {self.run_location}") + self.weird_mac_at_cabin = False + self.location_ts_by_mac: Dict[ + Location, Dict[str, datetime.datetime] + ] = defaultdict(dict) + self.names_by_mac: Dict[str, str] = {} + self.dark_locations: Set[Location] = set() + self.last_update = None + + def maybe_update(self) -> None: + if self.last_update is None: + self.update() + else: + now = datetime.datetime.now() + delta = now - self.last_update + if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds(): + logger.debug( + f"It's been {delta.total_seconds()}s since last update; refreshing now." + ) + self.update() + + def update(self) -> None: + self.dark_locations = set() + if self.run_location is Location.HOUSE: + self.update_from_house() + elif self.run_location is Location.CABIN: + self.update_from_cabin() + else: + raise Exception("Where the hell is this running?!") + self.last_update = datetime.datetime.now() + + def update_from_house(self) -> None: + from exec_utils import cmd + try: + persisted_macs = config.config['presence_macs_file'] + except KeyError: + persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' + self.read_persisted_macs_file(persisted_macs, Location.HOUSE) + try: + raw = cmd( + "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", + timeout_seconds=10.0, + ) + self.parse_raw_macs_file(raw, Location.CABIN) + except Exception as e: + logger.exception(e) + logger.warning("Can't see the cabin right now; presence detection impared.") + self.dark_locations.add(Location.CABIN) + + def update_from_cabin(self) -> None: + from exec_utils import cmd + try: + persisted_macs = config.config['presence_macs_file'] + except KeyError: + persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' + self.read_persisted_macs_file(persisted_macs, Location.CABIN) + try: + raw = cmd( + "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", + timeout_seconds=10.0, + ) + self.parse_raw_macs_file(raw, Location.HOUSE) + except Exception as e: + logger.exception(e) + logger.warning("Can't see the house right now; presence detection impared.") + self.dark_locations.add(Location.HOUSE) + + def read_persisted_macs_file( + self, filename: str, location: Location + ) -> None: + if location is Location.UNKNOWN: + return + with open(filename, "r") as rf: + lines = rf.read() + self.parse_raw_macs_file(lines, location) + + def parse_raw_macs_file(self, raw: str, location: Location) -> None: + lines = raw.split("\n") + + # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 + cabin_count = 0 + for line in lines: + line = line.strip() + if len(line) == 0: + continue + logger.debug(f'{location}> {line}') + if "cabin_" in line: + continue + if location == Location.CABIN: + logger.debug('Cabin count: {cabin_count}') + cabin_count += 1 + try: + (mac, count, ip_name, mfg, ts) = line.split(",") + except Exception as e: + logger.error(f'SKIPPED BAD LINE> {line}') + logger.exception(e) + continue + mac = mac.strip() + (self.location_ts_by_mac[location])[ + mac + ] = datetime.datetime.fromtimestamp(int(ts.strip())) + ip_name = ip_name.strip() + match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) + if match is not None: + name = match.group(2) + self.names_by_mac[mac] = name + if cabin_count > 0: + logger.debug('Weird MAC at the cabin') + self.weird_mac_at_cabin = True + + def is_anyone_in_location_now(self, location: Location) -> bool: + self.maybe_update() + if location in self.dark_locations: + raise Exception(f"Can't see {location} right now; answer undefined.") + for person in Person: + if person is not None: + loc = self.where_is_person_now(person) + if location == loc: + return True + if location == location.CABIN and self.weird_mac_at_cabin: + return True + return False + + def where_is_person_now(self, name: Person) -> Location: + self.maybe_update() + if len(self.dark_locations) > 0: + logger.warning( + f"Can't see {self.dark_locations} right now; answer confidence impacted" + ) + logger.debug(f'Looking for {name}...') + + if name is Person.UNKNOWN: + if self.weird_mac_at_cabin: + return Location.CABIN + else: + return Location.UNKNOWN + + import dict_utils + votes: Dict[Location, int] = {} + tiebreaks: Dict[Location, datetime.datetime] = {} + credit = 10000 + for mac in self.devices_by_person[name]: + if mac not in self.names_by_mac: + continue + mac_name = self.names_by_mac[mac] + logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') + for location in self.location_ts_by_mac: + if mac in self.location_ts_by_mac[location]: + ts = (self.location_ts_by_mac[location])[mac] + logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') + tiebreaks[location] = ts + + (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) + bonus = credit + v = votes.get(most_recent_location, 0) + votes[most_recent_location] = v + bonus + logger.debug(f'{name}: {location} gets {bonus} votes.') + credit = int( + credit * 0.2 + ) # Note: list most important devices first + if credit <= 0: + credit = 1 + if len(votes) > 0: + (location, value) = dict_utils.item_with_max_value(votes) + if value > 2001: + return location + return Location.UNKNOWN + + +@bootstrap.initialize +def main() -> None: + p = PresenceDetection() + for person in Person: + print(f'{person} => {p.where_is_person_now(person)}') + print() + for location in Location: + print(f'{location} => {p.is_anyone_in_location_now(location)}') + + +if __name__ == '__main__': + main() diff --git a/cached/weather_forecast.py b/cached/weather_forecast.py index d1e7540..b343938 100644 --- a/cached/weather_forecast.py +++ b/cached/weather_forecast.py @@ -19,6 +19,8 @@ import datetime_utils import dateparse.dateparse_utils as dp import persistent import text_utils +import smart_home.thermometers as temps + logger = logging.getLogger(__name__) @@ -61,23 +63,9 @@ class CachedDetailedWeatherForecast(persistent.Persistent): self.forecasts = {} # Ask the raspberry pi about the outside temperature. - www = None - try: - www = urllib.request.urlopen( - "http://10.0.0.75/~pi/outside_temp", - timeout=2, - ) - current_temp = www.read().decode("utf-8") - current_temp = float(current_temp) - current_temp *= (9/5) - current_temp += 32.0 - current_temp = round(current_temp) - except Exception: - logger.warning('Timed out reading 10.0.0.75/~pi/outside_temp?!') - current_temp = None - finally: - if www is not None: - www.close() + current_temp = temps.ThermometerRegistry().read_temperature( + 'house_outside', convert_to_fahrenheit=True + ) # Get a weather forecast for Bellevue. www = urllib.request.urlopen( diff --git a/camera_utils.py b/camera_utils.py index e85bd6e..acf760d 100644 --- a/camera_utils.py +++ b/camera_utils.py @@ -12,6 +12,7 @@ import numpy as np import requests import decorator_utils +import exceptions logger = logging.getLogger(__name__) @@ -23,28 +24,35 @@ class RawJpgHsv(NamedTuple): hsv: Optional[np.ndarray] -class BlueIrisImageMetadata(NamedTuple): +class SanityCheckImageMetadata(NamedTuple): """Is a Blue Iris image bad (big grey borders around it) or infrared?""" is_bad_image: bool is_infrared_image: bool -def analyze_blue_iris_image(hsv: np.ndarray) -> BlueIrisImageMetadata: +def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata: """See if a Blue Iris image is bad and infrared.""" + def is_near(a, b) -> bool: + return abs(a - b) < 3 + rows, cols, _ = hsv.shape num_pixels = rows * cols + weird_orange_count = 0 hs_zero_count = 0 - gray_count = 0 for r in range(rows): for c in range(cols): pixel = hsv[(r, c)] - if pixel[0] == 0 and pixel[1] == 0: + if ( + is_near(pixel[0], 16) and + is_near(pixel[1], 117) and + is_near(pixel[2], 196) + ): + weird_orange_count += 1 + elif (is_near(pixel[0], 0) and is_near(pixel[1], 0)): hs_zero_count += 1 - if abs(pixel[2] - 64) <= 10: - gray_count += 1 - logger.debug(f"gray#={gray_count}, hs0#={hs_zero_count}") - return BlueIrisImageMetadata( - gray_count > (num_pixels * 0.33), hs_zero_count > (num_pixels * 0.75) + logger.debug(f"hszero#={hs_zero_count}, weird_orange={weird_orange_count}") + return SanityCheckImageMetadata( + hs_zero_count > (num_pixels * 0.75), weird_orange_count > (num_pixels * 0.75) ) @@ -55,15 +63,19 @@ def fetch_camera_image_from_video_server( """Fetch the raw webcam image from the video server.""" camera_name = camera_name.replace(".house", "") camera_name = camera_name.replace(".cabin", "") - url = f"http://10.0.0.56:81/image/{camera_name}?w={width}&q={quality}" + url = f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg" try: response = requests.get(url, stream=False, timeout=10.0) if response.ok: raw = response.content + logger.debug(f'Read {len(response.content)} byte image from HTTP server') tmp = np.frombuffer(raw, dtype="uint8") + logger.debug(f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.') jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR) + logger.debug(f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}') hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV) - (is_bad_image, _) = analyze_blue_iris_image(hsv) + logger.debug(f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}') + (_, is_bad_image) = sanity_check_image(hsv) if not is_bad_image: logger.debug(f"Got a good image from {url}") return raw @@ -156,5 +168,5 @@ def fetch_camera_image( ) -> RawJpgHsv: try: return _fetch_camera_image(camera_name, width=width, quality=quality) - except decorator_utils.TimeoutError: + except exceptions.TimeoutError: return RawJpgHsv(None, None, None) diff --git a/dateparse/dateparse_utils.py b/dateparse/dateparse_utils.py index 026a513..21fdb83 100755 --- a/dateparse/dateparse_utils.py +++ b/dateparse/dateparse_utils.py @@ -1,5 +1,10 @@ #!/usr/bin/env python3 +""" +Parse dates in a variety of formats. + +""" + import datetime import functools import holidays # type: ignore @@ -249,6 +254,7 @@ class DateParser(dateparse_utilsListener): self.datetime: Optional[datetime.datetime] = None self.context: Dict[str, Any] = {} self.timedelta = datetime.timedelta(seconds=0) + self.saw_overt_year = False @staticmethod def _normalize_special_day_name(name: str) -> str: @@ -300,8 +306,10 @@ class DateParser(dateparse_utilsListener): next_last = self.context.get('special_next_last', '') if next_last == 'next': year += 1 + self.saw_overt_year = True elif next_last == 'last': year -= 1 + self.saw_overt_year = True # Holiday names if name == 'easte': @@ -360,6 +368,9 @@ class DateParser(dateparse_utilsListener): raise ParseException('Missing day') if 'year' not in self.context: self.context['year'] = self.today.year + self.saw_overt_year = False + else: + self.saw_overt_year = True # Handling "ides" and "nones" requires both the day and month. if ( @@ -610,11 +621,13 @@ class DateParser(dateparse_utilsListener): self.context['day'] = self.now_datetime.day self.context['month'] = self.now_datetime.month self.context['year'] = self.now_datetime.year + self.saw_overt_year = True elif txt[:4] == 'last': self.context['delta_int'] = -1 self.context['day'] = self.now_datetime.day self.context['month'] = self.now_datetime.month self.context['year'] = self.now_datetime.year + self.saw_overt_year = True else: raise ParseException(f'Bad next/last: {ctx.getText()}') @@ -843,6 +856,7 @@ class DateParser(dateparse_utilsListener): except Exception: raise ParseException(f'Bad year expression: {ctx.getText()}') else: + self.saw_overt_year = True self.context['year'] = year def exitSpecialDateMaybeYearExpr( diff --git a/executors.py b/executors.py index 92c5b34..cdbb811 100644 --- a/executors.py +++ b/executors.py @@ -745,10 +745,6 @@ class RemoteExecutor(BaseExecutor): xfer_latency = time.time() - start_ts logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: - logger.exception(e) - logger.error( - f'{bundle}: failed to send instructions to worker machine?!?' - ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, @@ -760,19 +756,30 @@ class RemoteExecutor(BaseExecutor): if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. + logger.exception(e) + logger.error( + f'{bundle}: Failed to send instructions to the worker machine?! ' + + 'This is not expected; we\'re the original bundle so this shouldn\'t ' + + 'be a race condition. Attempting an emergency retry...' + ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. + logger.warning( + f'{bundle}: Failed to send instructions to the worker machine... ' + + 'We\'re a backup and this may be caused by the original (or some ' + + 'other backup) already finishing this work. Ignoring this.' + ) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' - f'"source py39-venv/bin/activate &&' + f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') @@ -889,7 +896,7 @@ class RemoteExecutor(BaseExecutor): if is_original: logger.debug(f"{bundle}: Unpickling {result_file}.") try: - with open(f'{result_file}', 'rb') as rb: + with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: @@ -1112,30 +1119,10 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', - weight = 14, + weight = 25, count = 6, ), ) - if self.ping('video.house'): - logger.info('Found video.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'video.house', - weight = 1, - count = 4, - ), - ) - if self.ping('gorilla.house'): - logger.info('Found gorilla.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'gorilla.house', - weight = 2, - count = 4, - ), - ) if self.ping('meerkat.cabin'): logger.info('Found meerkat.cabin') pool.append( @@ -1146,16 +1133,16 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('kiosk.house'): - logger.info('Found kiosk.house') - pool.append( - RemoteWorkerRecord( - username = 'pi', - machine = 'kiosk.house', - weight = 1, - count = 2, - ), - ) + # if self.ping('kiosk.house'): + # logger.info('Found kiosk.house') + # pool.append( + # RemoteWorkerRecord( + # username = 'pi', + # machine = 'kiosk.house', + # weight = 1, + # count = 2, + # ), + # ) if self.ping('hero.house'): logger.info('Found hero.house') pool.append( @@ -1172,18 +1159,18 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', - weight = 12, + weight = 25, count = 6, ), ) - if self.ping('puma.house'): - logger.info('Found puma.house') + if self.ping('backup.house'): + logger.info('Found backup.house') pool.append( RemoteWorkerRecord( username = 'scott', - machine = 'puma.house', - weight = 12, - count = 6, + machine = 'backup.house', + weight = 3, + count = 2, ), ) diff --git a/list_utils.py b/list_utils.py index 05512b5..992f1ae 100644 --- a/list_utils.py +++ b/list_utils.py @@ -2,7 +2,7 @@ from collections import Counter from itertools import islice -from typing import Any, Iterator, List, Mapping, Sequence +from typing import Any, Iterator, List, Mapping, Sequence, Tuple def shard(lst: List[Any], size: int) -> Iterator[Any]: @@ -200,6 +200,70 @@ def ngrams(lst: Sequence[Any], n): yield lst[i:i + n] +def permute(seq: Sequence[Any]): + """ + Returns all permutations of a sequence; takes O(N^2) time. + + >>> for x in permute('cat'): + ... print(x) + cat + cta + act + atc + tca + tac + + """ + yield from _permute(seq, "") + +def _permute(seq: Sequence[Any], path): + if len(seq) == 0: + yield path + + for i in range(len(seq)): + car = seq[i] + left = seq[0:i] + right = seq[i + 1:] + cdr = left + right + yield from _permute(cdr, path + car) + + +def binary_search(lst: Sequence[Any], target:Any) -> Tuple[bool, int]: + """Performs a binary search on lst (which must already be sorted). + Returns a Tuple composed of a bool which indicates whether the + target was found and an int which indicates the index closest to + target whether it was found or not. + + >>> a = [1, 4, 5, 6, 7, 9, 10, 11] + >>> binary_search(a, 4) + (True, 1) + + >>> binary_search(a, 12) + (False, 8) + + >>> binary_search(a, 3) + (False, 1) + + >>> binary_search(a, 2) + (False, 1) + + """ + return _binary_search(lst, target, 0, len(lst) - 1) + + +def _binary_search(lst: Sequence[Any], target: Any, low: int, high: int) -> Tuple[bool, int]: + if high >= low: + mid = (high + low) // 2 + if lst[mid] == target: + return (True, mid) + elif lst[mid] > target: + return _binary_search(lst, target, low, mid - 1) + else: + return _binary_search(lst, target, mid + 1, high) + else: + return (False, low) + + if __name__ == '__main__': import doctest doctest.testmod() diff --git a/lockfile.py b/lockfile.py index b6a832e..ca190df 100644 --- a/lockfile.py +++ b/lockfile.py @@ -9,9 +9,20 @@ import signal import sys from typing import Optional +import config import decorator_utils +cfg = config.add_commandline_args( + f'Lockfile ({__file__})', + 'Args related to lockfiles') +cfg.add_argument( + '--lockfile_held_duration_warning_threshold_sec', + type=float, + default=10.0, + metavar='SECONDS', + help='If a lock is held for longer than this threshold we log a warning' +) logger = logging.getLogger(__name__) @@ -75,7 +86,7 @@ class LockFile(object): return True except OSError: pass - logger.debug(f'Failed; I could not acquire {self.lockfile}.') + logger.warning(f'Could not acquire {self.lockfile}.') return False def acquire_with_retries( @@ -108,12 +119,19 @@ class LockFile(object): def __enter__(self): if self.acquire_with_retries(): + self.locktime = datetime.datetime.now().timestamp() return self msg = f"Couldn't acquire {self.lockfile}; giving up." logger.warning(msg) raise LockFileException(msg) def __exit__(self, type, value, traceback): + if self.locktime: + ts = datetime.datetime.now().timestamp() + duration = ts - self.locktime + if duration >= config.config['lockfile_held_duration_warning_threshold_sec']: + str_duration = datetime_utils.describe_duration_briefly(duration) + logger.warning(f'Held {self.lockfile} for {str_duration}') self.release() def __del__(self): @@ -151,15 +169,16 @@ class LockFile(object): try: os.kill(contents.pid, 0) except OSError: - logger.debug('The pid seems stale; killing the lock.') + logger.warning(f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; ' + + 'force acquiring') self.release() # Has the lock expiration expired? if contents.expiration_timestamp is not None: now = datetime.datetime.now().timestamp() if now > contents.expiration_datetime: - logger.debug('The expiration time has passed; ' + - 'killing the lock') + logger.warning(f'Lockfile {self.lockfile} expiration time has passed; ' + + 'force acquiring') self.release() except Exception: pass diff --git a/logging_utils.py b/logging_utils.py index 819e3d3..278cbf0 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -43,8 +43,8 @@ cfg.add_argument( cfg.add_argument( '--logging_format', type=str, - default='%(levelname).1s:%(asctime)s: %(message)s', - help='The format for lines logged via the logger module.' + default=None, + help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects' ) cfg.add_argument( '--logging_date_format', @@ -86,6 +86,16 @@ cfg.add_argument( default=False, help='Should we log to localhost\'s syslog.' ) +cfg.add_argument( + '--logging_syslog_facility', + type=str, + default = 'USER', + choices=['NOTSET', 'AUTH', 'AUTH_PRIV', 'CRON', 'DAEMON', 'FTP', 'KERN', 'LPR', 'MAIL', 'NEWS', + 'SYSLOG', 'USER', 'UUCP', 'LOCAL0', 'LOCAL1', 'LOCAL2', 'LOCAL3', 'LOCAL4', 'LOCAL5', + 'LOCAL6', 'LOCAL7'], + metavar='SYSLOG_FACILITY_LIST', + help='The default syslog message facility identifier', +) cfg.add_argument( '--logging_debug_threads', action=argparse_utils.ActionNoYes, @@ -382,7 +392,14 @@ def initialize_logging(logger=None) -> logging.Logger: if not isinstance(default_logging_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) - fmt = config.config['logging_format'] + if config.config['logging_format']: + fmt = config.config['logging_format'] + else: + if config.config['logging_syslog']: + fmt = '%(levelname).1s:%(filename)s[%(process)d]: %(message)s' + else: + fmt = '%(levelname).1s:%(asctime)s: %(message)s' + if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' if config.config['logging_debug_modules']: @@ -390,7 +407,10 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_syslog']: if sys.platform not in ('win32', 'cygwin'): - handler = SysLogHandler() + if config.config['logging_syslog_facility']: + facility_name = 'LOG_' + config.config['logging_syslog_facility'] + facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER) + handler = SysLogHandler(facility=SysLogHandler.LOG_CRON, address='/dev/log') handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, diff --git a/ml/model_trainer.py b/ml/model_trainer.py index f61b8e7..acd7218 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -246,12 +246,12 @@ class TrainingBlueprint(ABC): y.pop() if self.spec.delete_bad_inputs: - msg = f"WARNING: {filename}: missing features or label. DELETING." + msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING." print(msg, file=sys.stderr) logger.warning(msg) os.remove(filename) else: - msg = f"WARNING: {filename}: missing features or label. Skipped." + msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping." print(msg, file=sys.stderr) logger.warning(msg) return (X, y) diff --git a/pip_install.sh b/pip_install.sh index 832ae9c..1ee08c9 100755 --- a/pip_install.sh +++ b/pip_install.sh @@ -1,5 +1,7 @@ #!/bin/bash +# Install a bunch of pip modules that scott library depends upon. + set -e python3 -m ensurepip --upgrade diff --git a/remote_worker.py b/remote_worker.py index c04ac65..0086c40 100755 --- a/remote_worker.py +++ b/remote_worker.py @@ -83,6 +83,7 @@ def main() -> None: in_file = config.config['code_file'] out_file = config.config['result_file'] + stop_thread = None if config.config['watch_for_cancel']: (thread, stop_thread) = watch_for_cancel() @@ -130,8 +131,9 @@ def main() -> None: stop_thread.set() sys.exit(-1) - stop_thread.set() - thread.join() + if stop_thread is not None: + stop_thread.set() + thread.join() if __name__ == '__main__': diff --git a/site_config.py b/site_config.py index 4968523..caaf3d8 100644 --- a/site_config.py +++ b/site_config.py @@ -33,7 +33,7 @@ class SiteConfig(object): network_netmask: str network_router_ip: str presence_location: Location - is_anyone_present: Callable[None, bool] + is_anyone_present: Callable arper_minimum_device_count: int @@ -53,7 +53,7 @@ def get_location(): """ Returns location as an enum instead of a string. - >>> from locations import Location + >>> from type.locations import Location >>> location = get_location() >>> location == Location.HOUSE or location == Location.CABIN True diff --git a/smart_home/cameras.py b/smart_home/cameras.py index 8137012..51a95e9 100644 --- a/smart_home/cameras.py +++ b/smart_home/cameras.py @@ -16,6 +16,7 @@ class BaseCamera(dev.Device): 'outside_driveway_camera': 'driveway', 'outside_doorbell_camera': 'doorbell', 'outside_front_door_camera': 'front_door', + 'crawlspace_camera': 'crawlspace', } def __init__(self, name: str, mac: str, keywords: str = "") -> None: @@ -23,5 +24,9 @@ class BaseCamera(dev.Device): self.camera_name = BaseCamera.camera_mapping.get(name, None) def get_stream_url(self) -> str: - assert self.camera_name is not None - return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mp4/GKlT2FfiSQ/{self.camera_name}/s.mp4' + name = self.camera_name + assert name is not None + if name == 'driveway': + return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mjpeg/GKlT2FfiSQ/driveway' + else: + return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mp4/GKlT2FfiSQ/{name}/s.mp4' diff --git a/smart_home/lights.py b/smart_home/lights.py index 1c4081c..e23569a 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -318,7 +318,10 @@ class TPLinkLight(BaseLight): @overrides def is_on(self) -> bool: - return self.get_on_duration_seconds() > 0 + self.info = self.get_info() + if self.info is None: + raise Exception('Unable to get info?') + return self.info.get("relay_state", "0") == "1" @overrides def is_off(self) -> bool: diff --git a/smart_home/outlets.py b/smart_home/outlets.py index f34d574..68dfd2b 100644 --- a/smart_home/outlets.py +++ b/smart_home/outlets.py @@ -3,6 +3,8 @@ """Utilities for dealing with the smart outlets.""" from abc import abstractmethod +import asyncio +import atexit import datetime import json import logging @@ -10,11 +12,16 @@ import os import re import subprocess import sys -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional + +from meross_iot.http_api import MerossHttpClient +from meross_iot.manager import MerossManager import argparse_utils import config +import decorator_utils import logging_utils +import scott_secrets import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized @@ -227,3 +234,81 @@ class GoogleOutlet(BaseOutlet): def is_off(self) -> bool: return not self.is_on() + + +@decorator_utils.singleton +class MerossWrapper(object): + """Note that instantiating this class causes HTTP traffic with an + external Meross server. Meross blocks customers who hit their + servers too aggressively so MerossOutlet is lazy about creating + instances of this class. + + """ + + def __init__(self): + self.loop = asyncio.get_event_loop() + self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL + self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD + self.devices = self.loop.run_until_complete(self.find_meross_devices()) + atexit.register(self.loop.close) + + async def find_meross_devices(self) -> List[Any]: + http_api_client = await MerossHttpClient.async_from_user_password( + email=self.email, password=self.password + ) + + # Setup and start the device manager + manager = MerossManager(http_client=http_api_client) + await manager.async_init() + + # Discover devices + await manager.async_device_discovery() + devices = manager.find_devices() + for device in devices: + await device.async_update() + return devices + + def get_meross_device_by_name(self, name: str) -> Optional[Any]: + name = name.lower() + name = name.replace('_', ' ') + for device in self.devices: + if device.name.lower() == name: + return device + return None + + +class MerossOutlet(BaseOutlet): + def __init__(self, name: str, mac: str, keywords: str = '') -> None: + super().__init__(name, mac, keywords) + self.meross_wrapper = None + self.device = None + + def lazy_initialize_device(self): + """If we make too many calls to Meross they will block us; only talk + to them when someone actually wants to control a device.""" + if self.meross_wrapper is None: + self.meross_wrapper = MerossWrapper() + self.device = self.meross_wrapper.get_meross_device_by_name(self.name) + if self.device is None: + raise Exception(f'{self.name} is not a known Meross device?!') + + def turn_on(self) -> bool: + self.lazy_initialize_device() + self.meross_wrapper.loop.run_until_complete( + self.device.async_turn_on() + ) + return True + + def turn_off(self) -> bool: + self.lazy_initialize_device() + self.meross_wrapper.loop.run_until_complete( + self.device.async_turn_off() + ) + return True + + def is_on(self) -> bool: + self.lazy_initialize_device() + return self.device.is_on() + + def is_off(self) -> bool: + return not self.is_on() diff --git a/smart_home/registry.py b/smart_home/registry.py index ae57a73..23584e1 100644 --- a/smart_home/registry.py +++ b/smart_home/registry.py @@ -165,6 +165,9 @@ class SmartHomeRegistry(object): else: logger.debug(' ...a TPLinkOutlet') return outlets.TPLinkOutlet(name, mac, kws) + elif 'meross' in kws.lower(): + logger.debug(' ...a MerossOutlet') + return outlets.MerossOutlet(name, mac, kws) elif 'goog' in kws.lower(): logger.debug(' ...a GoogleOutlet') return outlets.GoogleOutlet(name, mac, kws) diff --git a/smart_home/thermometers.py b/smart_home/thermometers.py new file mode 100644 index 0000000..fe5eed1 --- /dev/null +++ b/smart_home/thermometers.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + +import logging +from typing import Optional +import urllib.request + +logger = logging.getLogger() + + +class ThermometerRegistry(object): + def __init__(self): + self.thermometers = { + 'house_outside': ('10.0.0.75', 'outside_temp'), + 'house_inside_downstairs': ('10.0.0.75', 'inside_downstairs_temp'), + 'house_inside_upstairs': ('10.0.0.75', 'inside_upstairs_temp'), + 'house_computer_closet': ('10.0.0.75', 'computer_closet_temp'), + 'house_crawlspace': ('10.0.0.75', 'crawlspace_temp'), + 'cabin_outside': ('192.168.0.107', 'outside_temp'), + 'cabin_inside': ('192.168.0.107', 'inside_temp'), + 'cabin_crawlspace': ('192.168.0.107', 'crawlspace_temp'), + 'cabin_hottub': ('192.168.0.107', 'hottub_temp'), + } + + def read_temperature( + self, location: str, *, convert_to_fahrenheit=False + ) -> Optional[float]: + record = self.thermometers.get(location, None) + if record is None: + logger.error( + f'Location {location} is not known. Valid locations are {self.thermometers.keys()}.' + ) + return None + url = f'http://{record[0]}/~pi/{record[1]}' + logger.debug(f'Constructed URL: {url}') + try: + www = urllib.request.urlopen(url, timeout=3) + temp = www.read().decode('utf-8') + temp = float(temp) + if convert_to_fahrenheit: + temp *= (9/5) + temp += 32.0 + temp = round(temp) + except Exception as e: + logger.exception(e) + logger.error(f'Failed to read temperature at URL: {url}') + temp = None + finally: + if www is not None: + www.close() + return temp diff --git a/waitable_presence.py b/waitable_presence.py new file mode 100644 index 0000000..9e0a9d0 --- /dev/null +++ b/waitable_presence.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +"""A PresenceDetector that is waitable. This is not part of +base_presence.py because I do not want to bring these dependencies +into that lower-level module (especially state_tracker). + +""" + +import datetime +import logging +from typing import Optional, Tuple + +from overrides import overrides + +import base_presence +from type.locations import Location +import site_config +import state_tracker + +logger = logging.getLogger(__name__) + + +class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTracker): + """ + This is a waitable class that keeps a PresenceDetector internally + and periodically polls it to detect changes in presence in a + particular location. Example suggested usage pattern: + + detector = waitable_presence.WaitablePresenceDetectorWithMemory(60.0) + while True: + changed = detector.wait(timeout=60 * 5) # or, None for "forever" + (someone_is_home, since) = detector.is_someone_home() + if changed: + detector.reset() + logger.debug( + f'someone_is_home={someone_is_home}, since={since}, changed={changed}' + ) + """ + + def __init__( + self, + override_update_interval_sec: float = 60.0, + override_location: Location = site_config.get_location(), + ) -> None: + self.last_someone_is_home: Optional[bool] = None + self.someone_is_home: Optional[bool] = None + self.everyone_gone_since: Optional[datetime.datetime] = None + self.someone_home_since: Optional[datetime.datetime] = None + self.location = override_location + self.detector: base_presence.PresenceDetection = base_presence.PresenceDetection() + super().__init__( + { + 'poll_presence': override_update_interval_sec, + 'check_detector': override_update_interval_sec * 5, + } + ) + + @overrides + def update( + self, + update_id: str, + now: datetime.datetime, + last_invocation: Optional[datetime.datetime], + ) -> None: + if update_id == 'poll_presence': + self.poll_presence(now) + elif update_id == 'check_detector': + self.check_detector() + else: + raise Exception(f'Unknown update type {update_id} in {__file__}') + + def poll_presence(self, now: datetime.datetime) -> None: + logger.debug(f'Checking presence in {self.location} now...') + self.detector.update() + if self.detector.is_anyone_in_location_now(self.location): + self.someone_is_home = True + self.someone_home_since = now + else: + self.someone_is_home = False + self.everyone_gone_since = now + if self.someone_is_home != self.last_someone_is_home: + self.something_changed() + self.last_someone_is_home = self.someone_is_home + + def check_detector(self) -> None: + if len(self.detector.dark_locations) > 0: + logger.debug('PresenceDetector is incomplete; trying to reinitialize...') + self.detector = base_presence.PresenceDetection() + + def is_someone_home(self) -> Tuple[bool, datetime.datetime]: + """Returns a tuple of a bool that indicates whether someone is home + and a datetime that indicates how long either someone has been + home or no one has been home. + + """ + if self.someone_is_home is None: + raise Exception("Too Soon!") + if self.someone_is_home: + return (True, self.someone_home_since) + else: + return (False, self.everyone_gone_since) -- 2.45.2 From e8671a716da868332d3ac1f66d4d2f7f8d33fc28 Mon Sep 17 00:00:00 2001 From: Scott Date: Fri, 7 Jan 2022 11:14:29 -0800 Subject: [PATCH 12/16] Make logging optionally remove global handlers added by (shitty) pip modules. Make config optionally halt on unrecognized arguments. Make profanity filter smarter. --- bootstrap.py | 2 +- config.py | 16 ++++++++++++++ list_utils.py | 3 ++- logging_utils.py | 49 ++++++++++++++++++++++++++++++++++++++++++- profanity_filter.py | 6 ++++++ site_config.py | 4 ++-- smart_home/outlets.py | 10 ++++----- 7 files changed, 80 insertions(+), 10 deletions(-) diff --git a/bootstrap.py b/bootstrap.py index 03bb505..7ed8b40 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import functools -import importlib import logging import os from inspect import stack @@ -55,6 +54,7 @@ args.add_argument( original_hook = sys.excepthook + def handle_uncaught_exception(exc_type, exc_value, exc_tb): """ Top-level exception handler for exceptions that make it past any exception diff --git a/config.py b/config.py index ea5f68a..dc0042d 100644 --- a/config.py +++ b/config.py @@ -133,6 +133,16 @@ group.add_argument( default=None, help='Populate config file compatible with --config_loadfile to save global config for later use.', ) +group.add_argument( + '--config_rejects_unrecognized_arguments', + default=False, + action='store_true', + help=( + 'If present, config will raise an exception if it doesn\'t recognize an argument. The ' + + 'default behavior is to ignore this so as to allow interoperability with programs that ' + + 'want to use their own argparse calls to parse their own, separate commandline args.' + ) +) def is_flag_already_in_argv(var: str): @@ -249,6 +259,12 @@ def parse(entry_module: Optional[str]) -> Dict[str, Any]: # future argument parsers. For example, unittest_main in python # has some of its own flags. If we didn't recognize it, maybe # someone else will. + if len(unknown) > 0: + if config['config_rejects_unrecognized_arguments']: + raise Exception( + f'Encountered unrecognized config argument(s) {unknown} with --config_rejects_unrecognized_arguments enabled; halting.' + ) + saved_messages.append(f'Config encountered unrecognized commandline arguments: {unknown}') sys.argv = sys.argv[:1] + unknown # Check for savefile and populate it if requested. diff --git a/list_utils.py b/list_utils.py index 992f1ae..88c436b 100644 --- a/list_utils.py +++ b/list_utils.py @@ -216,6 +216,7 @@ def permute(seq: Sequence[Any]): """ yield from _permute(seq, "") + def _permute(seq: Sequence[Any], path): if len(seq) == 0: yield path @@ -228,7 +229,7 @@ def _permute(seq: Sequence[Any], path): yield from _permute(cdr, path + car) -def binary_search(lst: Sequence[Any], target:Any) -> Tuple[bool, int]: +def binary_search(lst: Sequence[Any], target: Any) -> Tuple[bool, int]: """Performs a binary search on lst (which must already be sorted). Returns a Tuple composed of a bool which indicates whether the target was found and an int which indicates the index closest to diff --git a/logging_utils.py b/logging_utils.py index 278cbf0..005761a 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -143,6 +143,17 @@ cfg.add_argument( 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' ) ) +cfg.add_argument( + '--logging_clear_spammy_handlers', + action=argparse_utils.ActionNoYes, + default=False, + help=( + 'Should logging code clear preexisting global logging handlers and thus insist that is ' + + 'alone can add handlers. Use this to work around annoying modules that insert global ' + + 'handlers with formats and logging levels you might now want. Caveat emptor, this may ' + + 'cause you to miss logging messages.' + ) +) built_in_print = print @@ -377,6 +388,12 @@ def initialize_logging(logger=None) -> logging.Logger: if logger is None: logger = logging.getLogger() # Root logger + spammy_handlers = 0 + if config.config['logging_clear_spammy_handlers']: + while logger.hasHandlers(): + logger.removeHandler(logger.handlers[0]) + spammy_handlers += 1 + if config.config['logging_config_file'] is not None: logging.config.fileConfig('logging.conf') return logger @@ -410,7 +427,7 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_syslog_facility']: facility_name = 'LOG_' + config.config['logging_syslog_facility'] facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER) - handler = SysLogHandler(facility=SysLogHandler.LOG_CRON, address='/dev/log') + handler = SysLogHandler(facility=facility, address='/dev/log') handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, @@ -485,6 +502,36 @@ def initialize_logging(logger=None) -> logging.Logger: built_in_print(*arg, **kwarg) builtins.print = print_and_also_log + logger.debug(f'Initialized logger; default logging level is {default_logging_level}.') + if config.config['logging_clear_spammy_handlers'] and spammy_handlers > 0: + logger.warning( + 'Logging cleared {spammy_handlers} global handlers (--logging_clear_spammy_handlers)' + ) + logger.debug(f'Logging format is "{fmt}"') + if config.config['logging_syslog']: + logger.debug(f'Logging to syslog as {facility_name} with normal severity mapping') + if config.config['logging_filename']: + logger.debug(f'Logging to filename {config.config["logging_filename"]} with rotation') + if config.config['logging_console']: + logger.debug(f'Logging to the console.') + if config.config['logging_info_is_print']: + logger.debug( + 'Logging logger.info messages will be repeated on stdout (--logging_info_is_print)' + ) + if config.config['logging_squelch_repeats_enabled']: + logger.debug( + 'Logging code is allowed to request repeated messages be squelched (--logging_squelch_repeats_enabled)' + ) + if config.config['logging_probabilistically_enabled']: + logger.debug( + 'Logging code is allowed to request probabilistic logging (--logging_probabilistically_enabled)' + ) + if config.config['lmodule']: + logger.debug( + 'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})' + ) + if config.config['logging_captures_prints']: + logger.debug('Logging will capture printed messages (--logging_captures_prints)') return logger diff --git a/profanity_filter.py b/profanity_filter.py index 5621cef..fe54221 100755 --- a/profanity_filter.py +++ b/profanity_filter.py @@ -347,6 +347,7 @@ class ProfanityFilter(object): 'poop chute', 'poopchute', 'porn', + 'pron', 'pornhub', 'porno', 'pornographi', @@ -471,6 +472,11 @@ class ProfanityFilter(object): def _normalize(self, text: str) -> str: result = text.lower() result = result.replace("_", " ") + result = result.replace('0', 'o') + result = result.replace('1', 'l') + result = result.replace('4', 'a') + result = result.replace('5', 's') + result = result.replace('3', 'e') for x in string.punctuation: result = result.replace(x, "") chunks = [ diff --git a/site_config.py b/site_config.py index caaf3d8..492623f 100644 --- a/site_config.py +++ b/site_config.py @@ -12,8 +12,8 @@ from type.locations import Location logger = logging.getLogger(__name__) args = config.add_commandline_args( - f'({__file__})', - 'Args related to __file__' + f'Global Site Config ({__file__})', + f'Args related to global site-specific configuration' ) args.add_argument( '--site_config_override_location', diff --git a/smart_home/outlets.py b/smart_home/outlets.py index 68dfd2b..8fd0948 100644 --- a/smart_home/outlets.py +++ b/smart_home/outlets.py @@ -238,13 +238,13 @@ class GoogleOutlet(BaseOutlet): @decorator_utils.singleton class MerossWrapper(object): - """Note that instantiating this class causes HTTP traffic with an - external Meross server. Meross blocks customers who hit their - servers too aggressively so MerossOutlet is lazy about creating - instances of this class. + """Global singleton helper class for MerossOutlets. Note that + instantiating this class causes HTTP traffic with an external + Meross server. Meross blocks customers who hit their servers too + aggressively so MerossOutlet is lazy about creating instances of + this class. """ - def __init__(self): self.loop = asyncio.get_event_loop() self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL -- 2.45.2 From 48a2eb5656508e2d1d16e6f22e979bed5ee682a6 Mon Sep 17 00:00:00 2001 From: Scott Date: Sat, 8 Jan 2022 13:01:47 -0800 Subject: [PATCH 13/16] Cleanup logging module. --- logging_utils.py | 111 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 33 deletions(-) diff --git a/logging_utils.py b/logging_utils.py index 005761a..de69046 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -10,12 +10,12 @@ import io import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os -import pytz import random import sys from typing import Callable, Iterable, Mapping, Optional from overrides import overrides +import pytz # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -115,13 +115,13 @@ cfg.add_argument( help='logging.info also prints to stdout.' ) cfg.add_argument( - '--logging_squelch_repeats_enabled', + '--logging_squelch_repeats', action=argparse_utils.ActionNoYes, default=True, help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?' ) cfg.add_argument( - '--logging_probabilistically_enabled', + '--logging_probabilistically', action=argparse_utils.ActionNoYes, default=True, help='Do we allow probabilistic logging (for code that wants it) or should we always log?' @@ -144,7 +144,7 @@ cfg.add_argument( ) ) cfg.add_argument( - '--logging_clear_spammy_handlers', + '--logging_clear_preexisting_handlers', action=argparse_utils.ActionNoYes, default=False, help=( @@ -155,8 +155,8 @@ cfg.add_argument( ) ) - built_in_print = print +logging_initialized = False def function_identifier(f: Callable) -> str: @@ -214,7 +214,11 @@ class SquelchRepeatedMessagesFilter(logging.Filter): This filter only affects logging messages that repeat more than a threshold number of times from functions that are tagged with - the @logging_utils.squelched_logging_ok decorator. + the @logging_utils.squelched_logging_ok decorator; others are + ignored. + + This functionality is enabled by default but can be disabled via + the --no_logging_squelch_repeats commandline flag. """ def __init__(self) -> None: @@ -234,8 +238,8 @@ class SquelchRepeatedMessagesFilter(logging.Filter): class DynamicPerScopeLoggingLevelFilter(logging.Filter): - """Only interested in seeing logging messages from an allow list of - module names or module:function names. Block others. + """This filter only allows logging messages from an allow list of + module names or module:function names. Blocks others. """ @staticmethod @@ -322,6 +326,10 @@ def logging_is_probabilistic(probability_of_logging: float) -> Callable: (i.e. they do not always unconditionally log) but rather are probabilistic (i.e. they log N% of the time randomly). + Note that this functionality can be disabled (forcing all logged + messages to produce output) via the --no_logging_probabilistically + cmdline argument. + This affects *ALL* logging statements within the marked function. """ @@ -365,7 +373,8 @@ class OnlyInfoFilter(logging.Filter): class MillisecondAwareFormatter(logging.Formatter): """ - A formatter for adding milliseconds to log messages. + A formatter for adding milliseconds to log messages which, for + whatever reason, the default python logger doesn't do. """ converter = datetime.datetime.fromtimestamp @@ -384,15 +393,20 @@ class MillisecondAwareFormatter(logging.Formatter): def initialize_logging(logger=None) -> logging.Logger: - assert config.has_been_parsed() + global logging_initialized + if logging_initialized: + return + logging_initialized = True + if logger is None: - logger = logging.getLogger() # Root logger + logger = logging.getLogger() - spammy_handlers = 0 - if config.config['logging_clear_spammy_handlers']: + preexisting_handlers_count = 0 + assert config.has_been_parsed() + if config.config['logging_clear_preexisting_handlers']: while logger.hasHandlers(): logger.removeHandler(logger.handlers[0]) - spammy_handlers += 1 + preexisting_handlers_count += 1 if config.config['logging_config_file'] is not None: logging.config.fileConfig('logging.conf') @@ -416,7 +430,6 @@ def initialize_logging(logger=None) -> logging.Logger: fmt = '%(levelname).1s:%(filename)s[%(process)d]: %(message)s' else: fmt = '%(levelname).1s:%(asctime)s: %(message)s' - if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' if config.config['logging_debug_modules']: @@ -471,11 +484,11 @@ def initialize_logging(logger=None) -> logging.Logger: handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) - if config.config['logging_squelch_repeats_enabled']: + if config.config['logging_squelch_repeats']: for handler in handlers: handler.addFilter(SquelchRepeatedMessagesFilter()) - if config.config['logging_probabilistically_enabled']: + if config.config['logging_probabilistically']: for handler in handlers: handler.addFilter(ProbabilisticFilter()) @@ -502,36 +515,56 @@ def initialize_logging(logger=None) -> logging.Logger: built_in_print(*arg, **kwarg) builtins.print = print_and_also_log - logger.debug(f'Initialized logger; default logging level is {default_logging_level}.') - if config.config['logging_clear_spammy_handlers'] and spammy_handlers > 0: + # At this point the logger is ready, handlers are set up, + # etc... so log about the logging configuration. + + level_name = logging._levelToName.get(default_logging_level, str(default_logging_level)) + logger.debug( + f'Initialized global logging; default logging level is {level_name}.' + ) + if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0: logger.warning( - 'Logging cleared {spammy_handlers} global handlers (--logging_clear_spammy_handlers)' + 'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)' ) - logger.debug(f'Logging format is "{fmt}"') + logger.debug(f'Logging format specification is "{fmt}"') + if config.config['logging_debug_threads']: + logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)') + if config.config['logging_debug_modules']: + logger.debug('...Logging format spec captures files/functions/lineno (--logging_debug_modules)') if config.config['logging_syslog']: - logger.debug(f'Logging to syslog as {facility_name} with normal severity mapping') + logger.debug(f'Logging to syslog as {facility_name} with priority mapping based on level') if config.config['logging_filename']: - logger.debug(f'Logging to filename {config.config["logging_filename"]} with rotation') + logger.debug(f'Logging to filename {config.config["logging_filename"]}') + logger.debug(f'...with {config.config["logging_filename_maxsize"]} bytes max file size.') + logger.debug(f'...and {config.config["logging_filename_count"]} rotating backup file count.') if config.config['logging_console']: - logger.debug(f'Logging to the console.') + logger.debug('Logging to the console (stderr).') if config.config['logging_info_is_print']: logger.debug( 'Logging logger.info messages will be repeated on stdout (--logging_info_is_print)' ) - if config.config['logging_squelch_repeats_enabled']: + if config.config['logging_squelch_repeats']: + logger.debug( + 'Logging code allowed to request repeated messages be squelched (--logging_squelch_repeats)' + ) + else: logger.debug( - 'Logging code is allowed to request repeated messages be squelched (--logging_squelch_repeats_enabled)' + 'Logging code forbidden to request messages be squelched; all messages logged (--no_logging_squelch_repeats)' ) - if config.config['logging_probabilistically_enabled']: + if config.config['logging_probabilistically']: logger.debug( - 'Logging code is allowed to request probabilistic logging (--logging_probabilistically_enabled)' + 'Logging code is allowed to request probabilistic logging (--logging_probabilistically)' + ) + else: + logger.debug( + 'Logging code is forbidden to request probabilistic logging; messages always logged (--no_logging_probabilistically)' ) if config.config['lmodule']: logger.debug( 'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})' ) if config.config['logging_captures_prints']: - logger.debug('Logging will capture printed messages (--logging_captures_prints)') + logger.debug('Logging will capture printed data as logger.info messages (--logging_captures_prints)') return logger @@ -541,8 +574,11 @@ def get_logger(name: str = ""): def tprint(*args, **kwargs) -> None: - """Legacy function for printing a message augmented with thread id.""" + """Legacy function for printing a message augmented with thread id + still needed by some code. Please use --logging_debug_threads in + new code. + """ if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") @@ -552,8 +588,11 @@ def tprint(*args, **kwargs) -> None: def dprint(*args, **kwargs) -> None: - """Legacy function used to print to stderr.""" + """Legacy function used to print to stderr still needed by some code. + Please just use normal logging with --logging_console which + accomplishes the same thing in new code. + """ print(*args, file=sys.stderr, **kwargs) @@ -561,7 +600,8 @@ class OutputMultiplexer(object): """ A class that broadcasts printed messages to several sinks (including various logging levels, different files, different file handles, - the house log, etc...) + the house log, etc...). See also OutputMultiplexerContext for an + easy usage pattern. """ class Destination(enum.IntEnum): @@ -720,8 +760,13 @@ class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): def hlog(message: str) -> None: - """Write a message to the house log.""" + """Write a message to the house log (syslog facility local7 priority + info) by calling /usr/bin/logger. This is pretty hacky but used + by a bunch of code. Another way to do this would be to use + --logging_syslog and --logging_syslog_facility but I can't + actually say that's easier. + """ message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'") -- 2.45.2 From f5015d539e319ffcd8a4b9a7e8891c67d0d754b3 Mon Sep 17 00:00:00 2001 From: Scott Date: Sat, 8 Jan 2022 15:21:25 -0800 Subject: [PATCH 14/16] Add rate limiter decorator. --- decorator_utils.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/decorator_utils.py b/decorator_utils.py index 4f98a6d..480543a 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -2,7 +2,6 @@ """Decorators.""" -import datetime import enum import functools import inspect @@ -15,7 +14,7 @@ import sys import threading import time import traceback -from typing import Callable, Optional +from typing import Any, Callable, Optional import warnings # This module is commonly used by others in here and should avoid @@ -58,6 +57,31 @@ def invocation_logged(func: Callable) -> Callable: return wrapper_invocation_logged +def rate_limited(n_per_second: int) -> Callable: + """Limit invocation of a wrapped function to n calls per second. + Thread safe. + + """ + min_interval = 1.0 / float(n_per_second) + + def wrapper_rate_limited(func: Callable) -> Callable: + last_invocation_time = [0.0] + + def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: + while True: + elapsed = time.clock_gettime(0) - last_invocation_time[0] + wait_time = min_interval - elapsed + if wait_time > 0.0: + time.sleep(wait_time) + else: + break + ret = func(*args, **kargs) + last_invocation_time[0] = time.clock_gettime(0) + return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited + + def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" -- 2.45.2 From 287360114f0a9d61d5dc3c3f168344df856ffbd5 Mon Sep 17 00:00:00 2001 From: Scott Date: Sun, 9 Jan 2022 22:15:45 -0800 Subject: [PATCH 15/16] Make rate_limited use cvs. --- decorator_utils.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/decorator_utils.py b/decorator_utils.py index 480543a..70a88d3 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -14,7 +14,7 @@ import sys import threading import time import traceback -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Tuple import warnings # This module is commonly used by others in here and should avoid @@ -57,26 +57,40 @@ def invocation_logged(func: Callable) -> Callable: return wrapper_invocation_logged -def rate_limited(n_per_second: int) -> Callable: - """Limit invocation of a wrapped function to n calls per second. - Thread safe. +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: + """Limit invocation of a wrapped function to n calls per period. + Thread safe. In testing this was relatively fair with multiple + threads using it though that hasn't been measured. """ - min_interval = 1.0 / float(n_per_second) + min_interval_seconds = per_period_in_seconds / float(n_calls) def wrapper_rate_limited(func: Callable) -> Callable: - last_invocation_time = [0.0] + cv = threading.Condition() + last_invocation_timestamp = [0.0] + + def may_proceed() -> float: + now = time.time() + last_invocation = last_invocation_timestamp[0] + if last_invocation != 0.0: + elapsed_since_last = now - last_invocation + wait_time = min_interval_seconds - elapsed_since_last + else: + wait_time = 0.0 + return wait_time def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: - while True: - elapsed = time.clock_gettime(0) - last_invocation_time[0] - wait_time = min_interval - elapsed - if wait_time > 0.0: - time.sleep(wait_time) - else: + with cv: + while True: + cv.wait_for( + lambda: may_proceed() <= 0.0, + timeout=may_proceed(), + ) break ret = func(*args, **kargs) - last_invocation_time[0] = time.clock_gettime(0) + with cv: + last_invocation_timestamp[0] = time.time() + cv.notify() return ret return wrapper_wrapper_rate_limited return wrapper_rate_limited -- 2.45.2 From 44a7740bf6fe18673f8636658256a227e0622880 Mon Sep 17 00:00:00 2001 From: Scott Date: Mon, 10 Jan 2022 09:45:31 -0800 Subject: [PATCH 16/16] Experiment with audit events in bootstrap. --- bootstrap.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/bootstrap.py b/bootstrap.py index 7ed8b40..c445aef 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -141,10 +141,16 @@ class ImportInterceptor(object): return [] -# TODO: test this with python 3.8+ -def audit_import_events(event, args): - print(event) - print(args) +# # TODO: test this with python 3.8+ +# def audit_import_events(event, args): +# if event == 'import': +# module = args[0] +# filename = args[1] +# sys_path = args[2] +# sys_meta_path = args[3] +# sys_path_hooks = args[4] +# logger.debug(msg) +# print(msg) # Audit import events? Note: this runs early in the lifetime of the @@ -158,15 +164,14 @@ def audit_import_events(event, args): import_interceptor = None for arg in sys.argv: if arg == '--audit_import_events': - if not hasattr(sys, 'frozen'): - if ( - sys.version_info[0] == 3 - and sys.version_info[1] < 8 - ): - import_interceptor = ImportInterceptor() - sys.meta_path = [import_interceptor] + sys.meta_path - else: - sys.addaudithook(audit_import_events) + import_interceptor = ImportInterceptor() + sys.meta_path = [import_interceptor] + sys.meta_path + # if not hasattr(sys, 'frozen'): + # if ( + # sys.version_info[0] == 3 + # and sys.version_info[1] >= 8 + # ): + # sys.addaudithook(audit_import_events) def dump_all_objects() -> None: -- 2.45.2