self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
] = {}
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
- def record_acquire_worker(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
- self.record_release_worker_already_locked(
- worker, uuid, was_cancelled
- )
+ self.record_release_worker_already_locked(worker, uuid, was_cancelled)
ret += f' ...{in_flight} bundles currently in flight:\n'
for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
details = self.bundle_details_by_uuid.get(bundle_uuid, None)
ret += f' ...{in_flight} bundles currently in flight:\n'
for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
details = self.bundle_details_by_uuid.get(bundle_uuid, None)
if self.start_per_bundle[bundle_uuid] is not None:
sec = ts - self.start_per_bundle[bundle_uuid]
ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
if self.start_per_bundle[bundle_uuid] is not None:
sec = ts - self.start_per_bundle[bundle_uuid]
ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
grabbag = []
for worker in self.workers:
for x in range(0, worker.count):
grabbag = []
for worker in self.workers:
for x in range(0, worker.count):
- def release_worker(
- self, bundle: BundleDetails, *, was_cancelled=True
- ) -> None:
+ def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
raise Exception(e)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
raise Exception(e)