X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;ds=inline;f=executors.py;h=453139a5fbd514525a8152a81cb2bd0673e9acc0;hb=e6f32fdd9b373dfcd100c7accb41f57d83c2f0a1;hp=46812c2b49203c2b23c021978e8e6fe334b80afa;hpb=36fea7f15ed17150691b5b3ead75450e575229ef;p=python_utils.git diff --git a/executors.py b/executors.py index 46812c2..453139a 100644 --- a/executors.py +++ b/executors.py @@ -160,9 +160,7 @@ class ProcessExecutor(BaseExecutor): self.adjust_task_count(+1) pickle = make_cloud_pickle(function, *args, **kwargs) result = self._process_executor.submit(self.run_cloud_pickle, pickle) - result.add_done_callback( - lambda _: self.histogram.add_item(time.time() - start) - ) + result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start)) return result @overrides @@ -258,9 +256,7 @@ class RemoteExecutorStatus: self.finished_bundle_timings_per_worker: Dict[ RemoteWorkerRecord, List[float] ] = {} - self.in_flight_bundles_by_worker: Dict[ - RemoteWorkerRecord, Set[str] - ] = {} + self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} self.finished_bundle_timings: List[float] = [] self.last_periodic_dump: Optional[float] = None @@ -270,9 +266,7 @@ class RemoteExecutorStatus: # as a memory fence for modifications to bundle. self.lock: threading.Lock = threading.Lock() - def record_acquire_worker( - self, worker: RemoteWorkerRecord, uuid: str - ) -> None: + def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None: with self.lock: self.record_acquire_worker_already_locked(worker, uuid) @@ -290,9 +284,7 @@ class RemoteExecutorStatus: with self.lock: self.record_bundle_details_already_locked(details) - def record_bundle_details_already_locked( - self, details: BundleDetails - ) -> None: + def record_bundle_details_already_locked(self, details: BundleDetails) -> None: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details @@ -303,9 +295,7 @@ class RemoteExecutorStatus: was_cancelled: bool, ) -> None: with self.lock: - self.record_release_worker_already_locked( - worker, uuid, was_cancelled - ) + self.record_release_worker_already_locked(worker, uuid, was_cancelled) def record_release_worker_already_locked( self, @@ -377,11 +367,7 @@ class RemoteExecutorStatus: ret += f' ...{in_flight} bundles currently in flight:\n' for bundle_uuid in self.in_flight_bundles_by_worker[worker]: details = self.bundle_details_by_uuid.get(bundle_uuid, None) - pid = ( - str(details.pid) - if (details and details.pid != 0) - else "TBD" - ) + pid = str(details.pid) if (details and details.pid != 0) else "TBD" if self.start_per_bundle[bundle_uuid] is not None: sec = ts - self.start_per_bundle[bundle_uuid] ret += f' (pid={pid}): {details} for {sec:.1f}s so far ' @@ -412,10 +398,7 @@ class RemoteExecutorStatus: assert self.lock.locked() self.total_bundles_submitted = total_bundles_submitted ts = time.time() - if ( - self.last_periodic_dump is None - or ts - self.last_periodic_dump > 5.0 - ): + if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0: print(self) self.last_periodic_dump = ts @@ -429,9 +412,7 @@ class RemoteWorkerSelectionPolicy(ABC): pass @abstractmethod - def acquire_worker( - self, machine_to_avoid=None - ) -> Optional[RemoteWorkerRecord]: + def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: pass @@ -444,9 +425,7 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): return False @overrides - def acquire_worker( - self, machine_to_avoid=None - ) -> Optional[RemoteWorkerRecord]: + def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: for x in range(0, worker.count): @@ -585,9 +564,7 @@ class RemoteExecutor(BaseExecutor): break for uuid in bundle_uuids: - bundle = self.status.bundle_details_by_uuid.get( - uuid, None - ) + bundle = self.status.bundle_details_by_uuid.get(uuid, None) if ( bundle is not None and bundle.src_bundle is None @@ -678,9 +655,7 @@ class RemoteExecutor(BaseExecutor): logger.critical(msg) raise Exception(msg) - def release_worker( - self, bundle: BundleDetails, *, was_cancelled=True - ) -> None: + def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None: worker = bundle.worker assert worker is not None logger.debug(f'Released worker {worker}') @@ -764,14 +739,14 @@ class RemoteExecutor(BaseExecutor): # Send input code / data to worker machine if it's not local. if hostname not in machine: try: - cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' + cmd = ( + f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' + ) start_ts = time.time() logger.info(f"{bundle}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts - logger.debug( - f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s." - ) + logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.") except Exception as e: self.release_worker(bundle) if is_original: @@ -804,9 +779,7 @@ class RemoteExecutor(BaseExecutor): f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"' ) - logger.debug( - f'{bundle}: Executing {cmd} in the background to kick off work...' - ) + logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = p.pid logger.debug( @@ -935,9 +908,7 @@ class RemoteExecutor(BaseExecutor): # Re-raise the exception; the code in wait_for_process may # decide to emergency_retry_nasty_bundle here. raise Exception(e) - logger.debug( - f'Removing local (master) {code_file} and {result_file}.' - ) + logger.debug(f'Removing local (master) {code_file} and {result_file}.') os.remove(f'{result_file}') os.remove(f'{code_file}')