self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
- result.add_done_callback(
- lambda _: self.histogram.add_item(time.time() - start)
- )
+ result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
return result
@overrides
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
] = {}
- self.in_flight_bundles_by_worker: Dict[
- RemoteWorkerRecord, Set[str]
- ] = {}
+ self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
# as a memory fence for modifications to bundle.
self.lock: threading.Lock = threading.Lock()
- def record_acquire_worker(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
with self.lock:
self.record_bundle_details_already_locked(details)
- def record_bundle_details_already_locked(
- self, details: BundleDetails
- ) -> None:
+ def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
assert self.lock.locked()
self.bundle_details_by_uuid[details.uuid] = details
was_cancelled: bool,
) -> None:
with self.lock:
- self.record_release_worker_already_locked(
- worker, uuid, was_cancelled
- )
+ self.record_release_worker_already_locked(worker, uuid, was_cancelled)
def record_release_worker_already_locked(
self,
ret += f' ...{in_flight} bundles currently in flight:\n'
for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
details = self.bundle_details_by_uuid.get(bundle_uuid, None)
- pid = (
- str(details.pid)
- if (details and details.pid != 0)
- else "TBD"
- )
+ pid = str(details.pid) if (details and details.pid != 0) else "TBD"
if self.start_per_bundle[bundle_uuid] is not None:
sec = ts - self.start_per_bundle[bundle_uuid]
ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
assert self.lock.locked()
self.total_bundles_submitted = total_bundles_submitted
ts = time.time()
- if (
- self.last_periodic_dump is None
- or ts - self.last_periodic_dump > 5.0
- ):
+ if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
print(self)
self.last_periodic_dump = ts
pass
@abstractmethod
- def acquire_worker(
- self, machine_to_avoid=None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
pass
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid=None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
for worker in self.workers:
for x in range(0, worker.count):
break
for uuid in bundle_uuids:
- bundle = self.status.bundle_details_by_uuid.get(
- uuid, None
- )
+ bundle = self.status.bundle_details_by_uuid.get(uuid, None)
if (
bundle is not None
and bundle.src_bundle is None
logger.critical(msg)
raise Exception(msg)
- def release_worker(
- self, bundle: BundleDetails, *, was_cancelled=True
- ) -> None:
+ def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
logger.debug(f'Released worker {worker}')
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ cmd = (
+ f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ )
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(
- f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s."
- )
+ logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
except Exception as e:
self.release_worker(bundle)
if is_original:
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(
- f'{bundle}: Executing {cmd} in the background to kick off work...'
- )
+ logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug(
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
raise Exception(e)
- logger.debug(
- f'Removing local (master) {code_file} and {result_file}.'
- )
+ logger.debug(f'Removing local (master) {code_file} and {result_file}.')
os.remove(f'{result_file}')
os.remove(f'{code_file}')