From 351e77c767c9084aa486eedbdc9902c635b06261 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Tue, 19 Oct 2021 16:40:34 -0700 Subject: [PATCH] Bugfixes. --- cached/weather_forecast.py | 28 ++++++++++++++++---------- executors.py | 40 ++++++++++++++++++++++---------------- id_generator.py | 8 ++++++-- input_utils.py | 3 ++- list_utils.py | 25 +++++++++++++++++++++++- ml/model_trainer.py | 2 +- 6 files changed, 74 insertions(+), 32 deletions(-) diff --git a/cached/weather_forecast.py b/cached/weather_forecast.py index 6e2f5f9..a413d9f 100644 --- a/cached/weather_forecast.py +++ b/cached/weather_forecast.py @@ -59,15 +59,23 @@ class CachedDetailedWeatherForecast(object): self.forecasts = {} # Ask the raspberry pi about the outside temperature. - www = urllib.request.urlopen( - "http://10.0.0.75/~pi/outside_temp" - ) - current_temp = www.read().decode("utf-8") - current_temp = float(current_temp) - current_temp *= (9/5) - current_temp += 32.0 - current_temp = round(current_temp) - www.close() + www = None + try: + www = urllib.request.urlopen( + "http://10.0.0.75/~pi/outside_temp", + timeout=2, + ) + current_temp = www.read().decode("utf-8") + current_temp = float(current_temp) + current_temp *= (9/5) + current_temp += 32.0 + current_temp = round(current_temp) + except Exception: + logger.warning('Timed out reading 10.0.0.75/~pi/outside_temp?!') + current_temp = None + finally: + if www is not None: + www.close() # Get a weather forecast for Bellevue. www = urllib.request.urlopen( @@ -102,7 +110,7 @@ class CachedDetailedWeatherForecast(object): sunrise = s['sunrise'] sunset = s['sunset'] - if dt.date() == now.date() and not said_temp: + if dt.date() == now.date() and not said_temp and current_temp is not None: blurb = f'{day.get_text()}: The current outside tempterature is {current_temp}. ' blurb += txt.get_text() said_temp = True diff --git a/executors.py b/executors.py index ddd62f1..0b4d80e 100644 --- a/executors.py +++ b/executors.py @@ -210,6 +210,7 @@ class RemoteWorkerRecord: class BundleDetails: pickled_code: bytes uuid: str + fname: str worker: Optional[RemoteWorkerRecord] username: Optional[str] machine: Optional[str] @@ -586,6 +587,7 @@ class RemoteExecutor(BaseExecutor): bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username + fname = bundle.fname self.status.record_acquire_worker(worker, uuid) logger.debug(f'Running bundle {uuid} on {worker}...') @@ -595,17 +597,17 @@ class RemoteExecutor(BaseExecutor): return self.post_launch_work(bundle) except Exception as e: logger.exception(e) - logger.info(f"Bundle {uuid} seems to have failed?!") + logger.info(f"{uuid}/{fname}: bundle seems to have failed?!") if bundle.failure_count < config.config['executors_max_bundle_failures']: return self.launch(bundle) else: - logger.info(f"Bundle {uuid} is poison, giving up on it.") + logger.info(f"{uuid}/{fname}: bundle is poison, giving up on it.") return None # Send input to machine if it's not local. if hostname not in machine: cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' - logger.info(f"Copying work to {worker} via {cmd}") + logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}") run_silently(cmd) # Do it. @@ -615,7 +617,7 @@ class RemoteExecutor(BaseExecutor): f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid - logger.info(f"Running {cmd} in the background as process {pid}") + logger.info(f"{uuid}/{fname}: Start training on {worker} via {cmd} (background pid {pid})") while True: try: @@ -630,7 +632,7 @@ class RemoteExecutor(BaseExecutor): break else: logger.debug( - f"{pid}/{bundle.uuid} has finished its work normally." + f"{uuid}/{fname}: pid {pid} has finished its work normally." ) break @@ -638,10 +640,10 @@ class RemoteExecutor(BaseExecutor): return self.post_launch_work(bundle) except Exception as e: logger.exception(e) - logger.info(f"Bundle {uuid} seems to have failed?!") + logger.info(f"{uuid}: Bundle seems to have failed?!") if bundle.failure_count < config.config['executors_max_bundle_failures']: return self.launch(bundle) - logger.info(f"Bundle {uuid} is poison, giving up on it.") + logger.info(f"{uuid}: Bundle is poison, giving up on it.") return None def post_launch_work(self, bundle: BundleDetails) -> Any: @@ -652,6 +654,8 @@ class RemoteExecutor(BaseExecutor): machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file + fname = bundle.fname + uuid = bundle.uuid # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a @@ -662,7 +666,7 @@ class RemoteExecutor(BaseExecutor): if bundle.hostname not in bundle.machine: cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( - f"Fetching results from {username}@{machine} via {cmd}" + f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}" ) try: run_silently(cmd) @@ -686,7 +690,7 @@ class RemoteExecutor(BaseExecutor): # if one of the backups finished first; it still must read the # result from disk. if is_original: - logger.debug(f"Unpickling {result_file}.") + logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.") try: with open(f'{result_file}', 'rb') as rb: serialized = rb.read() @@ -707,7 +711,7 @@ class RemoteExecutor(BaseExecutor): if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( - f'Notifying backup {backup.uuid} that it is cancelled' + f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled' ) backup.is_cancelled.set() @@ -721,7 +725,7 @@ class RemoteExecutor(BaseExecutor): # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( - f'Notifying original {bundle.src_bundle.uuid} that it is cancelled' + f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} that it\'s cancelled' ) bundle.src_bundle.is_cancelled.set() @@ -730,7 +734,7 @@ class RemoteExecutor(BaseExecutor): self.adjust_task_count(-1) return result - def create_original_bundle(self, pickle): + def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid uuid = generate_uuid(as_hex=True) code_file = f'/tmp/{uuid}.code.bin' @@ -743,6 +747,7 @@ class RemoteExecutor(BaseExecutor): bundle = BundleDetails( pickled_code = pickle, uuid = uuid, + fname = fname, worker = None, username = None, machine = None, @@ -761,7 +766,7 @@ class RemoteExecutor(BaseExecutor): failure_count = 0, ) self.status.record_bundle_details(bundle) - logger.debug(f'Created original bundle {uuid}') + logger.debug(f'{uuid}/{fname}: Created original bundle') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): @@ -772,6 +777,7 @@ class RemoteExecutor(BaseExecutor): backup_bundle = BundleDetails( pickled_code = src_bundle.pickled_code, uuid = uuid, + fname = src_bundle.fname, worker = None, username = None, machine = None, @@ -791,7 +797,7 @@ class RemoteExecutor(BaseExecutor): ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) - logger.debug(f'Created backup bundle {uuid}') + logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle') return backup_bundle def schedule_backup_for_bundle(self, @@ -799,7 +805,7 @@ class RemoteExecutor(BaseExecutor): assert self.status.lock.locked() backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( - f'Scheduling backup bundle {backup_bundle.uuid} for execution' + f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...' ) self._helper_executor.submit(self.launch, backup_bundle) @@ -812,7 +818,7 @@ class RemoteExecutor(BaseExecutor): *args, **kwargs) -> fut.Future: pickle = make_cloud_pickle(function, *args, **kwargs) - bundle = self.create_original_bundle(pickle) + bundle = self.create_original_bundle(pickle, function.__name__) self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) @@ -882,7 +888,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'meerkat.cabin', - weight = 6, + weight = 5, count = 2, ), ) diff --git a/id_generator.py b/id_generator.py index 4e650dc..bcd3a83 100644 --- a/id_generator.py +++ b/id_generator.py @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) generators = {} -def get(name: str) -> int: +def get(name: str, *, start=0) -> int: """ Returns a thread safe monotonically increasing id suitable for use as a globally unique identifier. @@ -20,9 +20,13 @@ def get(name: str) -> int: 0 >>> id_generator.get('student_id') 1 + >>> id_generator.get('employee_id', start=10000) + 10000 + >>> id_generator.get('employee_id', start=10000) + 10001 """ if name not in generators: - generators[name] = itertools.count() + generators[name] = itertools.count(start, 1) x = next(generators[name]) logger.debug(f"Generated next id {x}") return x diff --git a/input_utils.py b/input_utils.py index 153641b..a989b2d 100644 --- a/input_utils.py +++ b/input_utils.py @@ -2,11 +2,12 @@ """Utilities related to user input.""" -import readchar # type: ignore import signal import sys from typing import List +import readchar # type: ignore + import exceptions diff --git a/list_utils.py b/list_utils.py index 182e2bc..a8030e3 100644 --- a/list_utils.py +++ b/list_utils.py @@ -100,12 +100,35 @@ def dedup_list(lst: List[Any]) -> List[Any]: def uniq(lst: List[Any]) -> List[Any]: """ Alias for dedup_list. - """ return dedup_list(lst) def ngrams(lst: Sequence[Any], n): + """ + Return the ngrams in the sequence. + + >>> seq = 'encyclopedia' + >>> for _ in ngrams(seq, 3): + ... _ + 'enc' + 'ncy' + 'cyc' + 'ycl' + 'clo' + 'lop' + 'ope' + 'ped' + 'edi' + 'dia' + + >>> seq = ['this', 'is', 'an', 'awesome', 'test'] + >>> for _ in ngrams(seq, 3): + ... _ + ['this', 'is', 'an'] + ['is', 'an', 'awesome'] + ['an', 'awesome', 'test'] + """ for i in range(len(lst) - n + 1): yield lst[i:i + n] diff --git a/ml/model_trainer.py b/ml/model_trainer.py index ab3059f..9435351 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -376,7 +376,7 @@ class TrainingBlueprint(ABC): print(msg) logger.info(msg) model_info_filename = f"{self.spec.basename}_model_info.txt" - now: datetime.datetime = datetime_utils.now_pst() + now: datetime.datetime = datetime_utils.now_pacific() info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} Model params: {params} Training examples: {num_examples} -- 2.46.0