Get mypy to stop with this fucking "has no attribute" shit when
[python_utils.git] / executors.py
index 28507b0e8a547e9d1edc615daa5de7945edb457d..69330129c9b86ec8c9710b2586b24d7c2dfee8f6 100644 (file)
@@ -1,34 +1,34 @@
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
 from __future__ import annotations
 
-from abc import ABC, abstractmethod
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
-from typing import Any, Callable, Dict, List, Optional, Set
 import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
 from thread_utils import background_thread
 
-
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
@@ -84,10 +84,10 @@ class BaseExecutor(ABC):
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
-    def shutdown_if_idle(self) -> bool:
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
@@ -95,7 +95,7 @@ class BaseExecutor(ABC):
 
         """
         if self.task_count == 0:
-            self.shutdown()
+            self.shutdown(wait=True, quiet=quiet)
             return True
         return False
 
@@ -147,19 +147,18 @@ class ThreadExecutor(BaseExecutor):
         for arg in args:
             newargs.append(arg)
         start = time.time()
-        result = self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
-        )
+        result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logger.debug(f'Shutting down threadpool executor {self.title}')
-            print(self.histogram.__repr__(label_formatter='%d'))
             self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
@@ -197,11 +196,12 @@ class ProcessExecutor(BaseExecutor):
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logger.debug(f'Shutting down processpool executor {self.title}')
             self._process_executor.shutdown(wait)
-            print(self.histogram.__repr__(label_formatter='%d'))
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
     def __getstate__(self):
@@ -246,7 +246,7 @@ class BundleDetails:
     end_ts: float
     slower_than_local_p95: bool
     slower_than_global_p95: bool
-    src_bundle: BundleDetails
+    src_bundle: Optional[BundleDetails]
     is_cancelled: threading.Event
     was_cancelled: bool
     backup_bundles: Optional[List[BundleDetails]]
@@ -286,11 +286,9 @@ class RemoteExecutorStatus:
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -305,9 +303,7 @@ class RemoteExecutorStatus:
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -343,7 +339,9 @@ class RemoteExecutorStatus:
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
             x = self.finished_bundle_timings_per_worker.get(worker, list())
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
@@ -469,9 +467,7 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
                         grabbag.append(worker)
 
         if len(grabbag) == 0:
-            logger.debug(
-                f'There are no available workers that avoid {machine_to_avoid}...'
-            )
+            logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
             for worker in self.workers:
                 if worker.count > 0:
                     for _ in range(worker.count * worker.weight):
@@ -500,9 +496,7 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         x = self.index
         while True:
             worker = self.workers[x]
@@ -541,9 +535,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -625,21 +617,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug(f'score[{bundle}] => {score}  # latency boost')
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >worker p95')
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >global p95')
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -656,9 +642,7 @@ class RemoteExecutor(BaseExecutor):
                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -683,14 +667,10 @@ class RemoteExecutor(BaseExecutor):
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
@@ -737,7 +717,7 @@ class RemoteExecutor(BaseExecutor):
         worker = None
         while worker is None:
             worker = self.find_available_worker_or_block(avoid_machine)
-        assert worker
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
@@ -754,9 +734,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
+                logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
@@ -785,9 +763,7 @@ class RemoteExecutor(BaseExecutor):
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
@@ -828,15 +804,14 @@ class RemoteExecutor(BaseExecutor):
         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
+        logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
-        self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
         machine = bundle.machine
+        assert p is not None
         pid = p.pid
         if depth > 3:
             logger.error(
@@ -854,9 +829,7 @@ class RemoteExecutor(BaseExecutor):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                    logger.info(f'{bundle}: looks like another worker finished bundle...')
                     break
             else:
                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
@@ -924,12 +897,9 @@ class RemoteExecutor(BaseExecutor):
                             break
 
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
+                    logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -964,9 +934,7 @@ class RemoteExecutor(BaseExecutor):
             # backup.
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
-                    logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
-                    )
+                    logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
                     backup.is_cancelled.set()
 
         # This is a backup job and, by now, we have already fetched
@@ -979,10 +947,10 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
-                logger.debug(
-                    f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
-                )
-                bundle.src_bundle.is_cancelled.set()
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
+                logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
+                orig_bundle.is_cancelled.set()
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
@@ -1066,7 +1034,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1107,13 +1075,14 @@ class RemoteExecutor(BaseExecutor):
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logging.debug(f'Shutting down RemoteExecutor {self.title}')
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
-            print(self.histogram.__repr__(label_formatter='%d'))
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
@@ -1127,9 +1096,7 @@ class DefaultExecutors(object):
     def ping(self, host) -> bool:
         logger.debug(f'RUN> ping -c 1 {host}')
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
@@ -1212,11 +1179,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None