From: Scott Gasch Date: Fri, 19 Nov 2021 04:39:12 +0000 (-0800) Subject: Make smart futures avoid polling. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=ed8fa2b10b0177b15b7423263bdd390efde2f0c8;p=python_utils.git Make smart futures avoid polling. Much fucking around with the backup strategy in executors. Tweaks to presence. --- diff --git a/executors.py b/executors.py index b446822..c11bd54 100644 --- a/executors.py +++ b/executors.py @@ -22,8 +22,8 @@ from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils import config -from exec_utils import run_silently, cmd_in_background, cmd_with_timeout from decorator_utils import singleton +from exec_utils import run_silently, cmd_in_background, cmd_with_timeout import histogram as hist logger = logging.getLogger(__name__) @@ -230,8 +230,8 @@ class BundleDetails: pid: int start_ts: float end_ts: float - too_slow: bool - super_slow: bool + slower_than_local_p95: bool + slower_than_global_p95: bool src_bundle: BundleDetails is_cancelled: threading.Event was_cancelled: bool @@ -419,21 +419,19 @@ class RemoteExecutorStatus: if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' - elif sec > qworker[0]: - ret += f'{fg("red")}>💻p50{reset()} ' - if qall is not None: - if sec > qall[1] * 1.5: - ret += f'{bg("red")}!!!{reset()}' if details is not None: - logger.debug(f'Flagging {details} for another backup') - details.super_slow = True - elif sec > qall[1]: + details.slower_than_local_p95 = True + else: + if details is not None: + details.slower_than_local_p95 = False + + if qall is not None: + if sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: - logger.debug(f'Flagging {details} for a backup') - details.too_slow = True - elif sec > qall[0]: - ret += f'{fg("red")}>∀p50{reset()}' + details.slower_than_global_p95 = True + else: + details.slower_than_global_p95 = False ret += '\n' return ret @@ -451,7 +449,6 @@ class RemoteExecutorStatus: class RemoteWorkerSelectionPolicy(ABC): def register_worker_pool(self, workers): - random.seed() self.workers = workers @abstractmethod @@ -467,12 +464,14 @@ class RemoteWorkerSelectionPolicy(ABC): class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): + @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False + @overrides def acquire_worker( self, machine_to_avoid = None @@ -499,12 +498,14 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def __init__(self) -> None: self.index = 0 + @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False + @overrides def acquire_worker( self, machine_to_avoid: str = None @@ -551,6 +552,8 @@ class RemoteExecutor(BaseExecutor): ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 + self.backup_lock = threading.Lock() + self.last_backup = None def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -588,37 +591,84 @@ class RemoteExecutor(BaseExecutor): # Look for bundles to reschedule. num_done = len(self.status.finished_bundle_timings) - if num_done > 7 or (num_done > 5 and self.is_worker_available()): - for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): - for uuid in bundle_uuids: - bundle = self.status.bundle_details_by_uuid.get(uuid, None) - if ( - bundle is not None and - bundle.too_slow and - bundle.src_bundle is None and - config.config['executors_schedule_remote_backups'] - ): - self.consider_backup_for_bundle(bundle) - - def consider_backup_for_bundle(self, bundle: BundleDetails) -> None: - assert self.status.lock.locked() - if ( - bundle.too_slow - and len(bundle.backup_bundles) == 0 # one backup per - ): - msg = f"*** Rescheduling {bundle} (first backup) ***" - logger.debug(msg) - self.schedule_backup_for_bundle(bundle) - return - elif ( - bundle.super_slow - and len(bundle.backup_bundles) < 2 # two backups in dire situations - and self.status.total_idle() > 4 - ): - msg = f"*** Rescheduling {bundle} (second backup) ***" - logger.debug(msg) - self.schedule_backup_for_bundle(bundle) - return + num_idle_workers = self.worker_count - self.task_count + now = time.time() + if ( + config.config['executors_schedule_remote_backups'] + and num_done > 2 + and num_idle_workers > 1 + and (self.last_backup is None or (now - self.last_backup > 1.0)) + and self.backup_lock.acquire(blocking=False) + ): + try: + assert self.backup_lock.locked() + + bundle_to_backup = None + best_score = None + for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): + # Prefer to schedule backups of bundles on slower machines. + base_score = 0 + for record in self.workers: + if worker.machine == record.machine: + base_score = float(record.weight) + base_score = 1.0 / base_score + base_score *= 200.0 + base_score = int(base_score) + break + + for uuid in bundle_uuids: + bundle = self.status.bundle_details_by_uuid.get(uuid, None) + if ( + bundle is not None + and bundle.src_bundle is None + and bundle.backup_bundles is not None + ): + score = base_score + + # Schedule backups of bundles running longer; especially those + # that are unexpectedly slow. + start_ts = self.status.start_per_bundle[uuid] + if start_ts is not None: + runtime = now - start_ts + score += runtime + logger.debug(f'score[{bundle}] => {score} # latency boost') + + if bundle.slower_than_local_p95: + score += runtime / 2 + logger.debug(f'score[{bundle}] => {score} # >worker p95') + + if bundle.slower_than_global_p95: + score += runtime / 2 + logger.debug(f'score[{bundle}] => {score} # >global p95') + + # Prefer backups of bundles that don't have backups already. + backup_count = len(bundle.backup_bundles) + if backup_count == 0: + score *= 2 + elif backup_count == 1: + score /= 2 + elif backup_count == 2: + score /= 8 + else: + score = 0 + logger.debug(f'score[{bundle}] => {score} # {backup_count} dup backup factor') + + if ( + score != 0 + and (best_score is None or score > best_score) + ): + bundle_to_backup = bundle + assert bundle is not None + assert bundle.backup_bundles is not None + assert bundle.src_bundle is None + best_score = score + + if bundle_to_backup is not None: + self.last_backup = now + logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====') + self.schedule_backup_for_bundle(bundle_to_backup) + finally: + self.backup_lock.release() def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: @@ -921,8 +971,8 @@ class RemoteExecutor(BaseExecutor): pid = 0, start_ts = time.time(), end_ts = 0.0, - too_slow = False, - super_slow = False, + slower_than_local_p95 = False, + slower_than_global_p95 = False, src_bundle = None, is_cancelled = threading.Event(), was_cancelled = False, @@ -951,8 +1001,8 @@ class RemoteExecutor(BaseExecutor): pid = 0, start_ts = time.time(), end_ts = 0.0, - too_slow = False, - super_slow = False, + slower_than_local_p95 = False, + slower_than_global_p95 = False, src_bundle = src_bundle, is_cancelled = threading.Event(), was_cancelled = False, @@ -967,6 +1017,7 @@ class RemoteExecutor(BaseExecutor): def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() + assert src_bundle is not None backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...' @@ -1095,16 +1146,6 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('backup.house'): - logger.info('Found backup.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'backup.house', - weight = 1, - count = 4, - ), - ) if self.ping('kiosk.house'): logger.info('Found kiosk.house') pool.append( diff --git a/presence.py b/presence.py index b310183..d7db416 100755 --- a/presence.py +++ b/presence.py @@ -51,16 +51,14 @@ class PresenceDetection(object): Person.SCOTT: [ "3C:28:6D:10:6D:41", # pixel3 "6C:40:08:AE:DC:2E", # laptop -# "D4:61:2E:88:18:09", # watch -# "14:7D:DA:6A:20:D7", # work laptop ], Person.LYNN: [ - "08:CC:27:63:26:14", - "B8:31:B5:9A:4F:19", + "08:CC:27:63:26:14", # motog7 + "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ - "0C:CB:85:0C:8B:AE", - "D0:C6:37:E3:36:9A", + "0C:CB:85:0C:8B:AE", # phone + "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", @@ -149,6 +147,7 @@ class PresenceDetection(object): def where_is_person_now(self, name: Person) -> Location: import dict_utils + logger.debug(f'Looking for {name}...') if name is Person.UNKNOWN: if self.weird_mac_at_cabin: @@ -159,26 +158,30 @@ class PresenceDetection(object): tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 for mac in self.devices_by_person[name]: - logger.debug(f'Looking for {name}... check for mac {mac}') if mac not in self.names_by_mac: continue + mac_name = self.names_by_mac[mac] + logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] - logger.debug(f'I saw {mac} at {location} at {ts}') + logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') tiebreaks[location] = ts - location = dict_utils.key_with_min_value(tiebreaks) - v = votes.get(location, 0) - votes[location] = v + credit - logger.debug(f'{name}: {location} gets {credit} votes.') + + (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) + bonus = credit + v = votes.get(most_recent_location, 0) + votes[most_recent_location] = v + bonus + logger.debug(f'{name}: {location} gets {bonus} votes.') credit = int( - credit * 0.667 + credit * 0.2 ) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: - item = dict_utils.item_with_max_value(votes) - return item[0] + (location, value) = dict_utils.item_with_max_value(votes) + if value > 2001: + return location return Location.UNKNOWN diff --git a/smart_future.py b/smart_future.py index f11be17..c097d53 100644 --- a/smart_future.py +++ b/smart_future.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 from __future__ import annotations -from collections.abc import Mapping +import concurrent import concurrent.futures as fut -import time from typing import Callable, List, TypeVar from overrides import overrides @@ -17,35 +16,33 @@ T = TypeVar('T') def wait_any(futures: List[SmartFuture], *, callback: Callable = None): - finished: Mapping[int, bool] = {} - x = 0 - while True: - future = futures[x] - if not finished.get(future.get_id(), False): - if future.is_ready(): - finished[future.get_id()] = True - yield future - else: - if callback is not None: - callback() - time.sleep(0.1) - x += 1 - if x >= len(futures): - x = 0 - if len(finished) == len(futures): + real_futures = [] + smart_future_by_real_future = {} + completed_futures = set() + for _ in futures: + real_futures.append(_.wrapped_future) + smart_future_by_real_future[_.wrapped_future] = _ + while len(completed_futures) != len(real_futures): + newly_completed_futures = concurrent.futures.as_completed(real_futures) + for f in newly_completed_futures: if callback is not None: callback() - return + completed_futures.add(f) + yield smart_future_by_real_future[f] + if callback is not None: + callback() + return def wait_all(futures: List[SmartFuture]) -> None: - done_set = set() - while len(done_set) < len(futures): - for future in futures: - i = future.get_id() - if i not in done_set and future.wrapped_future.done(): - done_set.add(i) - time.sleep(0.1) + real_futures = [x.wrapped_future for x in futures] + (done, not_done) = concurrent.futures.wait( + real_futures, + timeout=None, + return_when=concurrent.futures.ALL_COMPLETED + ) + assert len(done) == len(real_futures) + assert len(not_done) == 0 class SmartFuture(DeferredOperand):