Ignore integration test results in code coverage report.
[python_utils.git] / executors.py
index 47b4a89a88d693d535ed2e036c6288829505a005..69330129c9b86ec8c9710b2586b24d7c2dfee8f6 100644 (file)
@@ -1,34 +1,34 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
 from __future__ import annotations
 
 
 from __future__ import annotations
 
-from abc import ABC, abstractmethod
 import concurrent.futures as fut
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
-from typing import Any, Callable, Dict, List, Optional, Set
 import warnings
 import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
 import argparse_utils
 import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 import histogram as hist
 import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
 from thread_utils import background_thread
 
 from thread_utils import background_thread
 
-
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
@@ -84,10 +84,10 @@ class BaseExecutor(ABC):
         pass
 
     @abstractmethod
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
         pass
 
-    def shutdown_if_idle(self) -> bool:
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
@@ -95,7 +95,7 @@ class BaseExecutor(ABC):
 
         """
         if self.task_count == 0:
 
         """
         if self.task_count == 0:
-            self.shutdown()
+            self.shutdown(wait=True, quiet=quiet)
             return True
         return False
 
             return True
         return False
 
@@ -147,19 +147,18 @@ class ThreadExecutor(BaseExecutor):
         for arg in args:
             newargs.append(arg)
         start = time.time()
         for arg in args:
             newargs.append(arg)
         start = time.time()
-        result = self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
-        )
+        result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logger.debug(f'Shutting down threadpool executor {self.title}')
         if not self.already_shutdown:
             logger.debug(f'Shutting down threadpool executor {self.title}')
-            print(self.histogram)
             self._thread_pool_executor.shutdown(wait)
             self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
             self.already_shutdown = True
 
 
@@ -197,11 +196,12 @@ class ProcessExecutor(BaseExecutor):
         return result
 
     @overrides
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logger.debug(f'Shutting down processpool executor {self.title}')
             self._process_executor.shutdown(wait)
         if not self.already_shutdown:
             logger.debug(f'Shutting down processpool executor {self.title}')
             self._process_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
     def __getstate__(self):
             self.already_shutdown = True
 
     def __getstate__(self):
@@ -246,7 +246,7 @@ class BundleDetails:
     end_ts: float
     slower_than_local_p95: bool
     slower_than_global_p95: bool
     end_ts: float
     slower_than_local_p95: bool
     slower_than_global_p95: bool
-    src_bundle: BundleDetails
+    src_bundle: Optional[BundleDetails]
     is_cancelled: threading.Event
     was_cancelled: bool
     backup_bundles: Optional[List[BundleDetails]]
     is_cancelled: threading.Event
     was_cancelled: bool
     backup_bundles: Optional[List[BundleDetails]]
@@ -286,11 +286,9 @@ class RemoteExecutorStatus:
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -305,9 +303,7 @@ class RemoteExecutorStatus:
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -343,7 +339,9 @@ class RemoteExecutorStatus:
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
             x = self.finished_bundle_timings_per_worker.get(worker, list())
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             x = self.finished_bundle_timings_per_worker.get(worker, list())
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
@@ -469,9 +467,7 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
                         grabbag.append(worker)
 
         if len(grabbag) == 0:
                         grabbag.append(worker)
 
         if len(grabbag) == 0:
-            logger.debug(
-                f'There are no available workers that avoid {machine_to_avoid}...'
-            )
+            logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
             for worker in self.workers:
                 if worker.count > 0:
                     for _ in range(worker.count * worker.weight):
             for worker in self.workers:
                 if worker.count > 0:
                     for _ in range(worker.count * worker.weight):
@@ -500,9 +496,7 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         return False
 
     @overrides
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         x = self.index
         while True:
             worker = self.workers[x]
         x = self.index
         while True:
             worker = self.workers[x]
@@ -541,9 +535,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -625,21 +617,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug(f'score[{bundle}] => {score}  # latency boost')
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >worker p95')
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >global p95')
 
                             # Prefer backups of bundles that don't
                             # have backups already.
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -656,9 +642,7 @@ class RemoteExecutor(BaseExecutor):
                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
                             )
 
                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -683,14 +667,10 @@ class RemoteExecutor(BaseExecutor):
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         return self.policy.acquire_worker(machine_to_avoid)
 
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
@@ -737,7 +717,7 @@ class RemoteExecutor(BaseExecutor):
         worker = None
         while worker is None:
             worker = self.find_available_worker_or_block(avoid_machine)
         worker = None
         while worker is None:
             worker = self.find_available_worker_or_block(avoid_machine)
-        assert worker
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
 
         # Ok, found a worker.
         bundle.worker = worker
@@ -754,9 +734,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
+                logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
@@ -785,9 +763,7 @@ class RemoteExecutor(BaseExecutor):
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
                 start_ts = time.time()
                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
@@ -828,15 +804,14 @@ class RemoteExecutor(BaseExecutor):
         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
+        logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
-        self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
         machine = bundle.machine
     ) -> Any:
         machine = bundle.machine
+        assert p is not None
         pid = p.pid
         if depth > 3:
             logger.error(
         pid = p.pid
         if depth > 3:
             logger.error(
@@ -854,9 +829,7 @@ class RemoteExecutor(BaseExecutor):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                    logger.info(f'{bundle}: looks like another worker finished bundle...')
                     break
             else:
                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
                     break
             else:
                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
@@ -924,12 +897,9 @@ class RemoteExecutor(BaseExecutor):
                             break
 
                     run_silently(
                             break
 
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
                     )
+                    logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -964,9 +934,7 @@ class RemoteExecutor(BaseExecutor):
             # backup.
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
             # backup.
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
-                    logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
-                    )
+                    logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
                     backup.is_cancelled.set()
 
         # This is a backup job and, by now, we have already fetched
                     backup.is_cancelled.set()
 
         # This is a backup job and, by now, we have already fetched
@@ -979,10 +947,10 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
-                logger.debug(
-                    f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
-                )
-                bundle.src_bundle.is_cancelled.set()
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
+                logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
+                orig_bundle.is_cancelled.set()
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
@@ -1066,7 +1034,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1107,13 +1075,14 @@ class RemoteExecutor(BaseExecutor):
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
             logging.debug(f'Shutting down RemoteExecutor {self.title}')
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
         if not self.already_shutdown:
             logging.debug(f'Shutting down RemoteExecutor {self.title}')
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
             self.already_shutdown = True
 
 
@@ -1127,9 +1096,7 @@ class DefaultExecutors(object):
     def ping(self, host) -> bool:
         logger.debug(f'RUN> ping -c 1 {host}')
         try:
     def ping(self, host) -> bool:
         logger.debug(f'RUN> ping -c 1 {host}')
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
             return x == 0
         except Exception:
             return False
@@ -1212,11 +1179,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None
             self.remote_executor = None