X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=e95ed716043b4962cd939b6d25885fd87826466a;hb=679b41526d9327f5bcecc702a928a72b68a1e0c8;hp=5b77a42dc3d29ca6f42673a369e23f0962343c62;hpb=5d04ec30aabad127c4eb9494a7e1be72847612e3;p=python_utils.git diff --git a/executors.py b/executors.py index 5b77a42..e95ed71 100644 --- a/executors.py +++ b/executors.py @@ -84,10 +84,10 @@ class BaseExecutor(ABC): pass @abstractmethod - def shutdown(self, wait: bool = True) -> None: + def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: pass - def shutdown_if_idle(self) -> bool: + def shutdown_if_idle(self, *, quiet: bool = False) -> bool: """Shutdown the executor and return True if the executor is idle (i.e. there are no pending or active tasks). Return False otherwise. Note: this should only be called by the launcher @@ -95,7 +95,7 @@ class BaseExecutor(ABC): """ if self.task_count == 0: - self.shutdown() + self.shutdown(wait=True, quiet=quiet) return True return False @@ -152,13 +152,15 @@ class ThreadExecutor(BaseExecutor): ) result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start)) result.add_done_callback(lambda _: self.adjust_task_count(-1)) + return result @overrides - def shutdown(self, wait=True) -> None: + def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logger.debug(f'Shutting down threadpool executor {self.title}') - print(self.histogram) self._thread_pool_executor.shutdown(wait) + if not quiet: + print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True @@ -196,11 +198,12 @@ class ProcessExecutor(BaseExecutor): return result @overrides - def shutdown(self, wait=True) -> None: + def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logger.debug(f'Shutting down processpool executor {self.title}') self._process_executor.shutdown(wait) - print(self.histogram) + if not quiet: + print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True def __getstate__(self): @@ -1106,13 +1109,14 @@ class RemoteExecutor(BaseExecutor): return self._helper_executor.submit(self.launch, bundle) @overrides - def shutdown(self, wait=True) -> None: + def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logging.debug(f'Shutting down RemoteExecutor {self.title}') self.heartbeat_stop_event.set() self.heartbeat_thread.join() self._helper_executor.shutdown(wait) - print(self.histogram) + if not quiet: + print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True @@ -1211,11 +1215,11 @@ class DefaultExecutors(object): def shutdown(self) -> None: if self.thread_executor is not None: - self.thread_executor.shutdown() + self.thread_executor.shutdown(wait=True, quiet=True) self.thread_executor = None if self.process_executor is not None: - self.process_executor.shutdown() + self.process_executor.shutdown(wait=True, quiet=True) self.process_executor = None if self.remote_executor is not None: - self.remote_executor.shutdown() + self.remote_executor.shutdown(wait=True, quiet=True) self.remote_executor = None