More cleanup.
authorScott Gasch <[email protected]>
Tue, 8 Feb 2022 22:38:15 +0000 (14:38 -0800)
committerScott Gasch <[email protected]>
Tue, 8 Feb 2022 22:38:15 +0000 (14:38 -0800)
conversion_utils.py
datetime_utils.py
decorator_utils.py
deferred_operand.py
dict_utils.py
directory_filter.py
exceptions.py
exec_utils.py
executors.py
file_utils.py
tests/parallelize_itest.py

index 8e64a875c1ad4e3d89eeee33205693204afd5d87..cf4fcaab93f75291d7e24bf54d55f672eace1578 100644 (file)
@@ -1,6 +1,8 @@
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
+"""Utilities involving converting between different units."""
+
 from typing import Callable, SupportsFloat
 
 import constants
index 1cee5163a22179d3c634a078433e83c53add8109..fc1ca4ed9e07e407554da7e300f53ca58f98661a 100644 (file)
@@ -159,9 +159,8 @@ def time_to_datetime_today(time: datetime.time) -> datetime.datetime:
     False
 
     """
-    now = now_pacific()
     tz = time.tzinfo
-    return datetime.datetime.combine(now, time, tz)
+    return datetime.datetime.combine(now_pacific(), time, tz)
 
 
 def date_and_time_to_datetime(date: datetime.date, time: datetime.time) -> datetime.datetime:
@@ -248,12 +247,12 @@ class TimeUnit(enum.IntEnum):
 
     @classmethod
     def is_valid(cls, value: Any):
-        if type(value) is int:
-            return value in cls._value2member_map_
-        elif type(value) is TimeUnit:
-            return value.value in cls._value2member_map_
-        elif type(value) is str:
-            return value in cls._member_names_
+        if isinstance(value, int):
+            return cls(value) is not None
+        elif isinstance(value, TimeUnit):
+            return cls(value.value) is not None
+        elif isinstance(value, str):
+            return cls.__members__[value] is not None
         else:
             print(type(value))
             return False
@@ -508,6 +507,7 @@ def datetime_to_string(
         date_time_separator=date_time_separator,
         include_timezone=include_timezone,
         include_dayname=include_dayname,
+        use_month_abbrevs=use_month_abbrevs,
         include_seconds=include_seconds,
         include_fractional=include_fractional,
         twelve_hour=twelve_hour,
@@ -541,6 +541,7 @@ def string_to_datetime(
         date_time_separator=date_time_separator,
         include_timezone=include_timezone,
         include_dayname=include_dayname,
+        use_month_abbrevs=use_month_abbrevs,
         include_seconds=include_seconds,
         include_fractional=include_fractional,
         twelve_hour=twelve_hour,
index 68a9d69633f6babe78c80153753d5c4a41c150af..5d1e779deeeaa3df34547f853ce5e4fc0cdcb87b 100644 (file)
@@ -133,7 +133,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl
                 wait_time = min_interval_seconds - elapsed_since_last
             else:
                 wait_time = 0.0
-            logger.debug(f'@{time.time()}> wait_time = {wait_time}')
+            logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time)
             return wait_time
 
         def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
@@ -145,10 +145,12 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl
                     ):
                         break
             with cv:
-                logger.debug(f'@{time.time()}> calling it...')
+                logger.debug('@%.4f> calling it...', time.time())
                 ret = func(*args, **kargs)
                 last_invocation_timestamp[0] = time.time()
-                logger.debug(f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}')
+                logger.debug(
+                    '@%.4f> Last invocation <- %.4f', time.time(), last_invocation_timestamp[0]
+                )
                 cv.notify()
             return ret
 
@@ -225,6 +227,11 @@ def debug_count_calls(func: Callable) -> Callable:
 
 
 class DelayWhen(enum.IntEnum):
+    """When should we delay: before or after calling the function (or
+    both)?
+
+    """
+
     BEFORE_CALL = 1
     AFTER_CALL = 2
     BEFORE_AND_AFTER = 3
@@ -259,11 +266,11 @@ def delay(
         @functools.wraps(func)
         def wrapper_delay(*args, **kwargs):
             if when & DelayWhen.BEFORE_CALL:
-                logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}")
+                logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__)
                 time.sleep(seconds)
             retval = func(*args, **kwargs)
             if when & DelayWhen.AFTER_CALL:
-                logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}")
+                logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__)
                 time.sleep(seconds)
             return retval
 
@@ -288,7 +295,7 @@ class _SingletonWrapper:
 
     def __call__(self, *args, **kwargs):
         """Returns a single instance of decorated class"""
-        logger.debug(f"@singleton returning global instance of {self.__wrapped__.__name__}")
+        logger.debug('@singleton returning global instance of %s', self.__wrapped__.__name__)
         if self._instance is None:
             self._instance = self.__wrapped__(*args, **kwargs)
         return self._instance
@@ -355,13 +362,13 @@ def memoized(func: Callable) -> Callable:
         cache_key = args + tuple(kwargs.items())
         if cache_key not in wrapper_memoized.cache:
             value = func(*args, **kwargs)
-            logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}")
+            logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__)
             wrapper_memoized.cache[cache_key] = value
         else:
-            logger.debug(f"Returning memoized value for {func.__name__}")
+            logger.debug('Returning memoized value for %s', {func.__name__})
         return wrapper_memoized.cache[cache_key]
 
-    wrapper_memoized.cache = dict()  # type: ignore
+    wrapper_memoized.cache = {}  # type: ignore
     return wrapper_memoized
 
 
@@ -403,7 +410,7 @@ def retry_predicate(
         @functools.wraps(f)
         def f_retry(*args, **kwargs):
             mtries, mdelay = tries, delay_sec  # make mutable
-            logger.debug(f'deco_retry: will make up to {mtries} attempts...')
+            logger.debug('deco_retry: will make up to %d attempts...', mtries)
             retval = f(*args, **kwargs)
             while mtries > 0:
                 if predicate(retval) is True:
@@ -537,7 +544,7 @@ def thunkify(func):
 
 def _raise_exception(exception, error_message: Optional[str]):
     if error_message is None:
-        raise Exception()
+        raise Exception(exception)
     else:
         raise Exception(error_message)
 
@@ -620,6 +627,7 @@ class _Timeout(object):
             if flag:
                 return load
             raise load
+        return None
 
 
 def timeout(
@@ -722,7 +730,8 @@ def call_with_sample_rate(sample_rate: float) -> Callable:
             if random.uniform(0, 1) < sample_rate:
                 return f(*args, **kwargs)
             else:
-                logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}")
+                logger.debug("@call_with_sample_rate skipping a call to %s", f.__name__)
+                return None
 
         return _call_with_sample_rate
 
index 75e98d923d5bcccc135e4d5c05415c7949fac8a9..70e9d57392dc0a0b44ba7464cf3be9ee1aa797eb 100644 (file)
@@ -1,5 +1,9 @@
 #!/usr/bin/env python3
 
+"""This is a helper class that tries to define every __dunder__ method
+so as to defer that evaluation of an object as long as possible.  It
+is used by smart_future.py as a base class."""
+
 from abc import ABC, abstractmethod
 from typing import Any, Generic, TypeVar
 
index 451a87dadf08d8632ac6f593dfb592116a05779b..ecd23fda0fe0f4624a27ca6a8971dd068ce2958e 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python3
 
+"""Helper functions for dealing with dictionaries."""
+
 from itertools import islice
 from typing import Any, Callable, Dict, Iterator, List, Tuple
 
@@ -42,10 +44,10 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
     """
     items = d.items()
     for x in range(0, len(d), size):
-        yield {key: value for (key, value) in islice(items, x, x + size)}
+        yield dict(islice(items, x, x + size))
 
 
-def coalesce_by_creating_list(key, new_value, old_value):
+def coalesce_by_creating_list(_, new_value, old_value):
     from list_utils import flatten
 
     return flatten([new_value, old_value])
@@ -55,11 +57,11 @@ def coalesce_by_creating_set(key, new_value, old_value):
     return set(coalesce_by_creating_list(key, new_value, old_value))
 
 
-def coalesce_last_write_wins(key, new_value, old_value):
+def coalesce_last_write_wins(_, new_value, discarded_old_value):
     return new_value
 
 
-def coalesce_first_write_wins(key, new_value, old_value):
+def coalesce_first_write_wins(_, discarded_new_value, old_value):
     return old_value
 
 
index b057f85a1c8c728a497171a37c4a216db231ba30..6985831fc3a6e3d8fe24550c831293e40383b9a6 100644 (file)
@@ -1,5 +1,11 @@
 #!/usr/bin/env python3
 
+"""Two predicates that can help avoid unnecessary disk I/O by
+detecting if a particular file is identical to the contents about to
+be written or if a particular directory already contains a file that
+is identical to the one to be written.  See class docs below for
+examples."""
+
 import hashlib
 import logging
 import os
@@ -58,18 +64,18 @@ class DirectoryFileFilter(object):
         assert mtime is not None
         if self.mtime_by_filename.get(filename, 0) != mtime:
             md5 = file_utils.get_file_md5(filename)
-            logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
+            logger.debug('Computed/stored %s\'s MD5 at ts=%.2f (%s)', filename, mtime, md5)
             self.mtime_by_filename[filename] = mtime
             self.md5_by_filename[filename] = md5
 
     def apply(self, item: Any, filename: str) -> bool:
         self._update_file(filename)
         file_md5 = self.md5_by_filename.get(filename, 0)
-        logger.debug(f'{filename}\'s checksum is {file_md5}')
+        logger.debug('%s\'s checksum is %s', filename, file_md5)
         mem_hash = hashlib.md5()
         mem_hash.update(item)
         md5 = mem_hash.hexdigest()
-        logger.debug(f'Item\'s checksum is {md5}')
+        logger.debug('Item\'s checksum is %s', md5)
         return md5 != file_md5
 
 
index 82a82a566b1b1da50af1942364761722c09f047c..aa0aecbef67abd8320db01c71446cc7ca5a24617 100644 (file)
@@ -1,19 +1,28 @@
 #!/usr/bin/env python3
 
+"""Some exceptions used elsewhere."""
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 
 
 class PreconditionException(AssertionError):
+    """Use to indicate function preconditions violated."""
+
     pass
 
 
 class PostconditionException(AssertionError):
+    """Use to indicate function postconditions violated."""
+
     pass
 
 
 class TimeoutError(Exception):
+    """Use to indicate an operation that timed out."""
+
     def __init__(self, value: str = "Timed out"):
+        super().__init__()
         self.value = value
 
     def __str__(self):
index df273352bb793d59f19717c9cb6806134dd5a628..a440de5375c1546d0bf46835bc00ecada06b7d9d 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python3
 
+"""Helper methods concerned with executing subprocesses."""
+
 import atexit
 import logging
 import os
@@ -137,7 +139,7 @@ def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen
     def kill_subproc() -> None:
         try:
             if subproc.poll() is None:
-                logger.info(f'At exit handler: killing {subproc} ({command})')
+                logger.info('At exit handler: killing %s (%s)', subproc, command)
                 subproc.terminate()
                 subproc.wait(timeout=10.0)
         except BaseException as be:
index 69330129c9b86ec8c9710b2586b24d7c2dfee8f6..2d80c3bdd92f80862d7b2be19678c004393f3ff3 100644 (file)
@@ -1,6 +1,14 @@
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
 from __future__ import annotations
 
 import concurrent.futures as fut
@@ -67,11 +75,13 @@ SCP = '/usr/bin/scp -C'
 
 
 def make_cloud_pickle(fun, *args, **kwargs):
-    logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+    logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
+    """The base executor interface definition."""
+
     def __init__(self, *, title=''):
         self.title = title
         self.histogram = hist.SimpleHistogram(
@@ -106,7 +116,7 @@ class BaseExecutor(ABC):
 
         """
         self.task_count += delta
-        logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+        logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
 
     def get_task_count(self) -> int:
         """Change the task count.  Note: do not call this method from a
@@ -118,6 +128,8 @@ class BaseExecutor(ABC):
 
 
 class ThreadExecutor(BaseExecutor):
+    """A threadpool executor instance."""
+
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
@@ -125,15 +137,16 @@ class ThreadExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug(f'Creating threadpool executor with {workers} workers')
+        logger.debug('Creating threadpool executor with %d workers', workers)
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self.already_shutdown = False
 
     # This is run on a different thread; do not adjust task count here.
-    def run_local_bundle(self, fun, *args, **kwargs):
-        logger.debug(f"Running local bundle at {fun.__name__}")
+    @staticmethod
+    def run_local_bundle(fun, *args, **kwargs):
+        logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
@@ -147,7 +160,9 @@ class ThreadExecutor(BaseExecutor):
         for arg in args:
             newargs.append(arg)
         start = time.time()
-        result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
+        result = self._thread_pool_executor.submit(
+            ThreadExecutor.run_local_bundle, *newargs, **kwargs
+        )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
@@ -155,7 +170,7 @@ class ThreadExecutor(BaseExecutor):
     @overrides
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down threadpool executor {self.title}')
+            logger.debug('Shutting down threadpool executor %s', self.title)
             self._thread_pool_executor.shutdown(wait)
             if not quiet:
                 print(self.histogram.__repr__(label_formatter='%ds'))
@@ -163,6 +178,8 @@ class ThreadExecutor(BaseExecutor):
 
 
 class ProcessExecutor(BaseExecutor):
+    """A processpool executor."""
+
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
@@ -170,16 +187,17 @@ class ProcessExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug(f'Creating processpool executor with {workers} workers.')
+        logger.debug('Creating processpool executor with %d workers.', workers)
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self.already_shutdown = False
 
     # This is run in another process; do not adjust task count here.
-    def run_cloud_pickle(self, pickle):
+    @staticmethod
+    def run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
-        logger.debug(f"Running pickled bundle at {fun.__name__}")
+        logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
@@ -190,7 +208,7 @@ class ProcessExecutor(BaseExecutor):
         start = time.time()
         self.adjust_task_count(+1)
         pickle = make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
@@ -198,7 +216,7 @@ class ProcessExecutor(BaseExecutor):
     @overrides
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down processpool executor {self.title}')
+            logger.debug('Shutting down processpool executor %s', self.title)
             self._process_executor.shutdown(wait)
             if not quiet:
                 print(self.histogram.__repr__(label_formatter='%ds'))
@@ -218,6 +236,8 @@ class RemoteExecutorException(Exception):
 
 @dataclass
 class RemoteWorkerRecord:
+    """A record of info about a remote worker."""
+
     username: str
     machine: str
     weight: int
@@ -232,6 +252,12 @@ class RemoteWorkerRecord:
 
 @dataclass
 class BundleDetails:
+    """All info necessary to define some unit of work that needs to be
+    done, where it is being run, its state, whether it is an original
+    bundle of a backup bundle, how many times it has failed, etc...
+
+    """
+
     pickled_code: bytes
     uuid: str
     fname: str
@@ -282,6 +308,8 @@ class BundleDetails:
 
 
 class RemoteExecutorStatus:
+    """A status 'scoreboard' for a remote executor."""
+
     def __init__(self, total_worker_count: int) -> None:
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
@@ -342,7 +370,7 @@ class RemoteExecutorStatus:
             start = self.start_per_bundle[uuid]
             assert start is not None
             bundle_latency = ts - start
-            x = self.finished_bundle_timings_per_worker.get(worker, list())
+            x = self.finished_bundle_timings_per_worker.get(worker, [])
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
@@ -437,7 +465,12 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    def register_worker_pool(self, workers):
+    """A policy for selecting a remote worker base class."""
+
+    def __init__(self):
+        self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+    def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
         self.workers = workers
 
     @abstractmethod
@@ -450,28 +483,33 @@ class RemoteWorkerSelectionPolicy(ABC):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that uses weighted RNG."""
+
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
-        for worker in self.workers:
-            if worker.machine != machine_to_avoid:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+        if self.workers:
+            for worker in self.workers:
+                if worker.machine != machine_to_avoid:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
-            logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
-            for worker in self.workers:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+            logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+            if self.workers:
+                for worker in self.workers:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
             logger.warning('There are no available workers?!')
@@ -480,44 +518,51 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         worker = random.sample(grabbag, 1)[0]
         assert worker.count > 0
         worker.count -= 1
-        logger.debug(f'Chose worker {worker}')
+        logger.debug('Selected worker %s', worker)
         return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that just round robins."""
+
     def __init__(self) -> None:
+        super().__init__()
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
-        x = self.index
-        while True:
-            worker = self.workers[x]
-            if worker.count > 0:
-                worker.count -= 1
+        if self.workers:
+            x = self.index
+            while True:
+                worker = self.workers[x]
+                if worker.count > 0:
+                    worker.count -= 1
+                    x += 1
+                    if x >= len(self.workers):
+                        x = 0
+                    self.index = x
+                    logger.debug('Selected worker %s', worker)
+                    return worker
                 x += 1
                 if x >= len(self.workers):
                     x = 0
-                self.index = x
-                logger.debug(f'Selected worker {worker}')
-                return worker
-            x += 1
-            if x >= len(self.workers):
-                x = 0
-            if x == self.index:
-                msg = 'Unexpectedly could not find a worker, retrying...'
-                logger.warning(msg)
-                return None
+                if x == self.index:
+                    logger.warning('Unexpectedly could not find a worker, retrying...')
+                    return None
+        return None
 
 
 class RemoteExecutor(BaseExecutor):
+    """A remote work executor."""
+
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
@@ -535,7 +580,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
+        logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -617,15 +662,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(f'score[{bundle}] => {score}  # latency boost')
+                                logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(f'score[{bundle}] => {score}  # >worker p95')
+                                    logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(f'score[{bundle}] => {score}  # >global p95')
+                                    logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -639,7 +684,9 @@ class RemoteExecutor(BaseExecutor):
                             else:
                                 score = 0
                             logger.debug(
-                                f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
+                                'score[%s] => %.1f  # {backup_count} dup backup factor',
+                                bundle,
+                                score,
                             )
 
                             if score != 0 and (best_score is None or score > best_score):
@@ -658,7 +705,9 @@ class RemoteExecutor(BaseExecutor):
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
-                        f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+                        '=====> SCHEDULING BACKUP %s (score=%.1f}) <=====',
+                        bundle_to_backup,
+                        best_score,
                     )
                     self.schedule_backup_for_bundle(bundle_to_backup)
             finally:
@@ -684,7 +733,7 @@ class RemoteExecutor(BaseExecutor):
     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
         worker = bundle.worker
         assert worker is not None
-        logger.debug(f'Released worker {worker}')
+        logger.debug('Released worker %s', worker)
         self.status.record_release_worker(
             worker,
             bundle.uuid,
@@ -698,7 +747,7 @@ class RemoteExecutor(BaseExecutor):
     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
-                logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+                logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
                 bundle.was_cancelled = True
                 return True
         return False
@@ -724,7 +773,7 @@ class RemoteExecutor(BaseExecutor):
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{bundle}: Running bundle on {worker}...')
+        logger.debug('%s: Running bundle on %s...', bundle, worker)
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
@@ -734,7 +783,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
-                logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
+                logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
@@ -746,8 +795,9 @@ class RemoteExecutor(BaseExecutor):
                     # thing.
                     logger.exception(e)
                     logger.error(
-                        f'{bundle}: We are the original owner thread and yet there are '
-                        + 'no results for this bundle.  This is unexpected and bad.'
+                        '%s: We are the original owner thread and yet there are '
+                        'no results for this bundle.  This is unexpected and bad.',
+                        bundle,
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
@@ -765,10 +815,10 @@ class RemoteExecutor(BaseExecutor):
             try:
                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
-                logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+                logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+                logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
                 self.release_worker(bundle)
                 if is_original:
@@ -776,9 +826,10 @@ class RemoteExecutor(BaseExecutor):
                     # And we're the original bundle.  We have to retry.
                     logger.exception(e)
                     logger.error(
-                        f"{bundle}: Failed to send instructions to the worker machine?! "
-                        + "This is not expected; we\'re the original bundle so this shouldn\'t "
-                        + "be a race condition.  Attempting an emergency retry..."
+                        "%s: Failed to send instructions to the worker machine?! "
+                        "This is not expected; we\'re the original bundle so this shouldn\'t "
+                        "be a race condition.  Attempting an emergency retry...",
+                        bundle,
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
@@ -786,10 +837,12 @@ class RemoteExecutor(BaseExecutor):
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                     # code file before we could copy it.  No biggie.
-                    msg = f'{bundle}: Failed to send instructions to the worker machine... '
-                    msg += 'We\'re a backup and this may be caused by the original (or some '
-                    msg += 'other backup) already finishing this work.  Ignoring this.'
-                    logger.warning(msg)
+                    logger.warning(
+                        '%s: Failed to send instructions to the worker machine... '
+                        'We\'re a backup and this may be caused by the original (or '
+                        'some other backup) already finishing this work.  Ignoring.',
+                        bundle,
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
@@ -801,10 +854,10 @@ class RemoteExecutor(BaseExecutor):
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
-        logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+        logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
+        logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
@@ -815,7 +868,7 @@ class RemoteExecutor(BaseExecutor):
         pid = p.pid
         if depth > 3:
             logger.error(
-                f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+                "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
             self.release_worker(bundle)
@@ -829,10 +882,10 @@ class RemoteExecutor(BaseExecutor):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
-                    logger.info(f'{bundle}: looks like another worker finished bundle...')
+                    logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
-                logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+                logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
                 p = None
                 break
 
@@ -853,10 +906,11 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{bundle}: Something unexpected just happened...')
+            logger.error('%s: Something unexpected just happened...', bundle)
             if p is not None:
-                msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
-                logger.warning(msg)
+                logger.warning(
+                    "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+                )
                 return self.wait_for_process(p, bundle, depth + 1)
             else:
                 self.release_worker(bundle)
@@ -880,7 +934,11 @@ class RemoteExecutor(BaseExecutor):
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+                        "%s: Fetching results back from %s@%s via %s",
+                        bundle,
+                        username,
+                        machine,
+                        cmd,
                     )
 
                     # If either of these throw they are handled in
@@ -892,14 +950,14 @@ class RemoteExecutor(BaseExecutor):
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
-                                raise (e)
+                                raise e
                         else:
                             break
 
                     run_silently(
                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
-                    logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
+                    logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -910,23 +968,22 @@ class RemoteExecutor(BaseExecutor):
         # if one of the backups finished first; it still must read the
         # result from disk.
         if is_original:
-            logger.debug(f"{bundle}: Unpickling {result_file}.")
+            logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
-                msg = f'Failed to load {result_file}... this is bad news.'
-                logger.critical(msg)
+                logger.error('Failed to load %s... this is bad news.', result_file)
                 self.release_worker(bundle)
 
                 # Re-raise the exception; the code in wait_for_process may
                 # decide to emergency_retry_nasty_bundle here.
-                raise Exception(e)
-            logger.debug(f'Removing local (master) {code_file} and {result_file}.')
-            os.remove(f'{result_file}')
-            os.remove(f'{code_file}')
+                raise e
+            logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+            os.remove(result_file)
+            os.remove(code_file)
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
@@ -934,7 +991,9 @@ class RemoteExecutor(BaseExecutor):
             # backup.
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
-                    logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
+                    logger.debug(
+                        '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
+                    )
                     backup.is_cancelled.set()
 
         # This is a backup job and, by now, we have already fetched
@@ -949,7 +1008,9 @@ class RemoteExecutor(BaseExecutor):
             if not was_cancelled:
                 orig_bundle = bundle.src_bundle
                 assert orig_bundle is not None
-                logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
+                logger.debug(
+                    '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
+                )
                 orig_bundle.is_cancelled.set()
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
@@ -961,8 +1022,8 @@ class RemoteExecutor(BaseExecutor):
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
-        logger.debug(f'Writing pickled code to {code_file}')
-        with open(f'{code_file}', 'wb') as wb:
+        logger.debug('Writing pickled code to %s', code_file)
+        with open(code_file, 'wb') as wb:
             wb.write(pickle)
 
         bundle = BundleDetails(
@@ -987,7 +1048,7 @@ class RemoteExecutor(BaseExecutor):
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{bundle}: Created an original bundle')
+        logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
     def create_backup_bundle(self, src_bundle: BundleDetails):
@@ -1018,7 +1079,7 @@ class RemoteExecutor(BaseExecutor):
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{backup_bundle}: Created a backup bundle')
+        logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
@@ -1026,7 +1087,7 @@ class RemoteExecutor(BaseExecutor):
         assert src_bundle is not None
         backup_bundle = self.create_backup_bundle(src_bundle)
         logger.debug(
-            f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+            '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
         )
         self._helper_executor.submit(self.launch, backup_bundle)
 
@@ -1048,15 +1109,17 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                '%s: Tried this bundle too many times already (%dx); giving up.',
+                bundle,
+                retry_limit,
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+                    f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
             else:
                 logger.error(
-                    f'{bundle}: At least it\'s only a backup; better luck with the others.'
+                    '%s: At least it\'s only a backup; better luck with the others.', bundle
                 )
             return None
         else:
@@ -1077,7 +1140,7 @@ class RemoteExecutor(BaseExecutor):
     @overrides
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
-            logging.debug(f'Shutting down RemoteExecutor {self.title}')
+            logging.debug('Shutting down RemoteExecutor %s', self.title)
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
@@ -1088,13 +1151,20 @@ class RemoteExecutor(BaseExecutor):
 
 @singleton
 class DefaultExecutors(object):
+    """A container for a default thread, process and remote executor.
+    These are not created until needed and we take care to clean up
+    before process exit.
+
+    """
+
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
-    def ping(self, host) -> bool:
-        logger.debug(f'RUN> ping -c 1 {host}')
+    @staticmethod
+    def ping(host) -> bool:
+        logger.debug('RUN> ping -c 1 %s', host)
         try:
             x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
@@ -1169,7 +1239,7 @@ class DefaultExecutors(object):
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
-                    logger.info(f'Reducing workload for {record.machine}.')
+                    logger.info('Reducing workload for %s.', record.machine)
                     record.count = 1
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
index 6bcfc75db47efa6d2da2327cdd42284c589901d4..d1e2eff1fac729b6251179ad7dfc584a0674215f 100644 (file)
@@ -6,7 +6,6 @@ import datetime
 import errno
 import glob
 import hashlib
-import io
 import logging
 import os
 import pathlib
@@ -190,7 +189,7 @@ def create_path_if_not_exist(path, on_error=None):
     >>> os.path.exists(path)
     True
     """
-    logger.debug(f"Creating path {path}")
+    logger.debug("Creating path %s", path)
     previous_umask = os.umask(0)
     try:
         os.makedirs(path)
@@ -462,6 +461,11 @@ def get_files_recursive(directory: str):
 
 
 class FileWriter(object):
+    """A helper that writes a file to a temporary location and then moves
+    it atomically to its ultimate destination on close.
+
+    """
+
     def __init__(self, filename: str) -> None:
         self.filename = filename
         uuid = uuid4()
index d09e9f39acb95db3f7a592d182aa046c6306ac20..409b8dc174078b1c5810098e290c042a1e9f38c3 100755 (executable)
@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 
+import logging
 import sys
 
 import bootstrap
@@ -8,6 +9,8 @@ import executors
 import parallelize as p
 import smart_future
 
+logger = logging.getLogger(__name__)
+
 
 @p.parallelize(method=p.Method.THREAD)
 def compute_factorial_thread(n):
@@ -77,4 +80,8 @@ def main() -> None:
 
 
 if __name__ == '__main__':
-    main()
+    try:
+        main()
+    except Exception as e:
+        logger.exception(e)
+        sys.exit(1)