Change settings in flake8 and black.
[python_utils.git] / executors.py
index b4cb06b4c9ad80816f0bb13e05e23863846fa8c1..2f735b7c568638f6d8419d3b4cc2476aeb3f3b9f 100644 (file)
@@ -146,9 +146,7 @@ class ThreadExecutor(BaseExecutor):
         for arg in args:
             newargs.append(arg)
         start = time.time()
-        result = self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
-        )
+        result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
@@ -289,9 +287,7 @@ class RemoteExecutorStatus:
         self.start_time: float = time.time()
         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -306,9 +302,7 @@ class RemoteExecutorStatus:
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -472,9 +466,7 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
                         grabbag.append(worker)
 
         if len(grabbag) == 0:
-            logger.debug(
-                f'There are no available workers that avoid {machine_to_avoid}...'
-            )
+            logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
             for worker in self.workers:
                 if worker.count > 0:
                     for _ in range(worker.count * worker.weight):
@@ -503,9 +495,7 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         x = self.index
         while True:
             worker = self.workers[x]
@@ -544,9 +534,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -628,21 +616,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug(f'score[{bundle}] => {score}  # latency boost')
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >worker p95')
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug(f'score[{bundle}] => {score}  # >global p95')
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -659,9 +641,7 @@ class RemoteExecutor(BaseExecutor):
                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -686,14 +666,10 @@ class RemoteExecutor(BaseExecutor):
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
@@ -757,9 +733,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
+                logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
@@ -788,9 +762,7 @@ class RemoteExecutor(BaseExecutor):
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
@@ -831,9 +803,7 @@ class RemoteExecutor(BaseExecutor):
         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
+        logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
@@ -858,9 +828,7 @@ class RemoteExecutor(BaseExecutor):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                    logger.info(f'{bundle}: looks like another worker finished bundle...')
                     break
             else:
                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
@@ -928,12 +896,9 @@ class RemoteExecutor(BaseExecutor):
                             break
 
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
+                    logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -968,9 +933,7 @@ class RemoteExecutor(BaseExecutor):
             # backup.
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
-                    logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
-                    )
+                    logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
                     backup.is_cancelled.set()
 
         # This is a backup job and, by now, we have already fetched
@@ -985,9 +948,7 @@ class RemoteExecutor(BaseExecutor):
             if not was_cancelled:
                 orig_bundle = bundle.src_bundle
                 assert orig_bundle is not None
-                logger.debug(
-                    f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
-                )
+                logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
                 orig_bundle.is_cancelled.set()
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
@@ -1072,9 +1033,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
-    def emergency_retry_nasty_bundle(
-        self, bundle: BundleDetails
-    ) -> Optional[fut.Future]:
+    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1136,9 +1095,7 @@ class DefaultExecutors(object):
     def ping(self, host) -> bool:
         logger.debug(f'RUN> ping -c 1 {host}')
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False