from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+from overrides import overrides
from ansi import bg, fg, underline, reset
import argparse_utils
self.histogram.add_item(duration)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
*newargs,
**kwargs)
+ @overrides
def shutdown(self,
wait = True) -> None:
logger.debug(f'Shutting down threadpool executor {self.title}')
self.adjust_task_count(-1)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
)
return result
+ @overrides
def shutdown(self, wait=True) -> None:
logger.debug(f'Shutting down processpool executor {self.title}')
self._process_executor.shutdown(wait)
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
+ @overrides
def submit(self,
function: Callable,
*args,
self.total_bundles_submitted += 1
return self._helper_executor.submit(self.launch, bundle)
+ @overrides
def shutdown(self, wait=True) -> None:
self._helper_executor.shutdown(wait)
logging.debug(f'Shutting down RemoteExecutor {self.title}')
self.remote_executor: Optional[RemoteExecutor] = None
def ping(self, host) -> bool:
+ logger.debug(f'RUN> ping -c 1 {host}')
command = ['ping', '-c', '1', host]
return subprocess.call(
command,
return self.process_executor
def remote_pool(self) -> RemoteExecutor:
+ logger.info('Looking for some helper machines...')
if self.remote_executor is None:
pool: List[RemoteWorkerRecord] = []
if self.ping('cheetah.house'):
+ logger.info('Found cheetah.house')
pool.append(
RemoteWorkerRecord(
username = 'scott',
),
)
if self.ping('video.house'):
+ logger.info('Found video.house')
pool.append(
RemoteWorkerRecord(
username = 'scott',
),
)
if self.ping('wannabe.house'):
+ logger.info('Found wannabe.house')
pool.append(
RemoteWorkerRecord(
username = 'scott',
),
)
if self.ping('meerkat.cabin'):
+ logger.info('Found meerkat.cabin')
pool.append(
RemoteWorkerRecord(
username = 'scott',
),
)
if self.ping('backup.house'):
+ logger.info('Found backup.house')
pool.append(
RemoteWorkerRecord(
username = 'scott',
count = 4,
),
)
+ if self.ping('kiosk.house'):
+ logger.info('Found kiosk.house')
+ pool.append(
+ RemoteWorkerRecord(
+ username = 'pi',
+ machine = 'kiosk.house',
+ weight = 1,
+ count = 2,
+ ),
+ )
if self.ping('puma.cabin'):
+ logger.info('Found puma.cabin')
pool.append(
RemoteWorkerRecord(
username = 'scott',
count = 4,
),
)
+
+ # The controller machine has a lot to do; go easy on it.
+ for record in pool:
+ if record.machine == platform.node() and record.count > 1:
+ logger.info(f'Reducing workload for {record.machine}.')
+ record.count = 1
+
policy = WeightedRandomRemoteWorkerSelectionPolicy()
policy.register_worker_pool(pool)
self.remote_executor = RemoteExecutor(pool, policy)