if s is not None:
return s
msg = f"{ip} is an invalid IP address"
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
if s is not None:
return s
msg = f"{mac} is an invalid MAC address"
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
if 0.0 <= n <= 100.0:
return n
msg = f"{num} is an invalid percentage; expected 0 <= n <= 100.0"
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
if os.path.exists(s):
return s
msg = f"{filename} was not found and is therefore invalid."
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
if date is not None:
return date
msg = f'Cannot parse argument as a date: {txt}'
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
if dt is not None:
return dt
msg = f'Cannot parse argument as datetime: {txt}'
- logger.warning(msg)
+ logger.error(msg)
raise argparse.ArgumentTypeError(msg)
import logging
import os
from typing import Any, Optional
+import warnings
from overrides import overrides
if len(cached_state) > config.config['arper_min_entries_to_be_valid']:
return cls(cached_state)
else:
- logger.warning(
- f'{cache_file} sucks, only {len(cached_state)} entries. Deleting it.'
- )
+ msg = f'{cache_file} is invalid: only {len(cached_state)} entries. Deleting it.'
+ warnings.warn(msg)
+ logger.warning(msg)
os.remove(cache_file)
logger.debug('No usable saved state found')
return None
import logging
import re
from typing import Dict, List, Set
+import warnings
# Note: this module is fairly early loaded. Be aware of dependencies.
import argparse_utils
self.parse_raw_macs_file(raw, Location.CABIN)
except Exception as e:
logger.exception(e)
- logger.warning("Can't see the cabin right now; presence detection impared.")
+ msg = "Can't see the cabin right now; presence detection impared."
+ logger.warning(msg)
+ warnings.warn(msg)
self.dark_locations.add(Location.CABIN)
def update_from_cabin(self) -> None:
self.parse_raw_macs_file(raw, Location.HOUSE)
except Exception as e:
logger.exception(e)
- logger.warning("Can't see the house right now; presence detection impared.")
+ msg = "Can't see the house right now; presence detection impared."
+ logger.warning(msg)
+ warnings.warn(msg)
self.dark_locations.add(Location.HOUSE)
def read_persisted_macs_file(
def where_is_person_now(self, name: Person) -> Location:
self.maybe_update()
if len(self.dark_locations) > 0:
- logger.warning(
- f"Can't see {self.dark_locations} right now; answer confidence impacted"
- )
+ msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
+ logger.warning(msg)
+ warnings.warn(msg)
logger.debug(f'Looking for {name}...')
if name is Person.UNKNOWN:
import platform
import subprocess
from typing import NamedTuple, Optional
+import warnings
import cv2 # type: ignore
import numpy as np
return raw
except Exception as e:
logger.exception(e)
- logger.warning(f"Got a bad image or HTTP error from {url}")
+ msg = f"Got a bad image or HTTP error from {url}; returning None."
+ logger.warning(msg)
+ warnings.warn(msg)
return None
) -> Optional[bytes]:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
hostname = blue_iris_camera_name_to_hostname(camera_name)
+ stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
try:
cmd = [
"/usr/bin/timeout",
"/usr/local/bin/ffmpeg",
"-y",
"-i",
- f"rtsp://camera:IaLaIok@{hostname}:554/live",
+ f"{stream}",
"-f",
"singlejpeg",
"-vframes",
return out
except Exception as e:
logger.exception(e)
- logger.warning("Failed to retrieve image from RTSP stream")
+ msg = "Failed to retrieve image via RTSP {stream}, returning None."
+ warnings.warn(msg)
+ logger.warning(msg)
return None
jpg=jpg,
hsv=hsv,
)
- logger.warning(
- "Failed to retieve image from both video server and direct RTSP stream"
- )
+ msg = "Failed to retieve image from both video server and direct RTSP stream"
+ logger.warning(msg)
+ warnings.warn(msg)
return RawJpgHsv(None, None, None)
def wrapper_deprecated(*args, **kwargs):
msg = f"Call to deprecated function {func.__qualname__}"
logger.warning(msg)
- warnings.warn(msg, category=DeprecationWarning)
+ warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
print(msg, file=sys.stderr)
return func(*args, **kwargs)
return wrapper_deprecated
exc[0] = True
exc[1] = sys.exc_info() # (type, value, traceback)
msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
- print(msg)
logger.warning(msg)
+ warnings.warn(msg)
finally:
wait_event.set()
import threading
import time
from typing import Any, Callable, Dict, List, Optional, Set
+import warnings
import cloudpickle # type: ignore
from overrides import overrides
worker.count -= 1
logger.debug(f'Selected worker {worker}')
return worker
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
+ warnings.warn(msg)
return None
if x >= len(self.workers):
x = 0
if x == self.index:
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
+ warnings.warn(msg)
return None
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
- logger.warning(
- f'{bundle}: Failed to send instructions to the worker machine... ' +
+ msg = f'{bundle}: Failed to send instructions to the worker machine... ' +
'We\'re a backup and this may be caused by the original (or some ' +
'other backup) already finishing this work. Ignoring this.'
- )
+ logger.warning(msg)
+ warnings.warn(msg)
return None
# Kick off the work. Note that if this fails we let
logger.exception(e)
logger.error(f'{bundle}: Something unexpected just happened...')
if p is not None:
- logger.warning(
- f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- )
+ msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+ logger.warning(msg)
+ warnings.warn(msg)
return self.wait_for_process(p, bundle, depth + 1)
else:
self.status.record_release_worker(
logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
return None
else:
- logger.warning(
- f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
- )
+ msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
+ logger.warning(msg)
+ warnings.warn(msg)
return self.launch(bundle, avoid_last_machine)
@overrides
# os.remove(file) you fuckwit.
+# os.path.basename too.
def create_path_if_not_exist(path, on_error=None):
import logging
from typing import NamedTuple
import sys
+import warnings
import requests
import speech_recognition as sr # type: ignore
logger.debug(f"Transcription: '{audio_transcription}'")
except sr.UnknownValueError as e:
logger.exception(e)
- logger.warning('Unable to parse Google assistant\'s response.')
+ msg = 'Unable to parse Google assistant\'s response.'
+ logger.warning(msg)
+ warnings.warn(msg)
audio_transcription = None
return GoogleResponse(
success=success,
import signal
import sys
from typing import Optional
+import warnings
import config
import datetime_utils
return True
except OSError:
pass
- logger.warning(f'Could not acquire {self.lockfile}.')
+ msg = f'Could not acquire {self.lockfile}.'
+ logger.warning(msg)
+ warnings.warn(msg)
return False
def acquire_with_retries(
return self
msg = f"Couldn't acquire {self.lockfile}; giving up."
logger.warning(msg)
+ warnings.warn(msg)
raise LockFileException(msg)
def __exit__(self, type, value, traceback):
duration = ts - self.locktime
if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
str_duration = datetime_utils.describe_duration_briefly(duration)
- logger.warning(f'Held {self.lockfile} for {str_duration}')
+ msg = f'Held {self.lockfile} for {str_duration}'
+ logger.warning(msg)
+ warnings.warn(msg, stacklevel=2)
self.release()
def __del__(self):
try:
os.kill(contents.pid, 0)
except OSError:
- logger.warning(f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; ' +
- 'force acquiring')
+ msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
+ logger.warning(msg)
+ warnings.warn(msg)
self.release()
# Has the lock expiration expired?
if contents.expiration_timestamp is not None:
now = datetime.datetime.now().timestamp()
if now > contents.expiration_datetime:
- logger.warning(f'Lockfile {self.lockfile} expiration time has passed; ' +
- 'force acquiring')
+ msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
+ logger.warning(msg)
+ warnings.warn(msg)
self.release()
except Exception:
pass
import random
import sys
from typing import Callable, Iterable, Mapping, Optional
+import warnings
from overrides import overrides
import pytz
f'Initialized global logging; default logging level is {level_name}.'
)
if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0:
- logger.warning(
- 'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
- )
+ msg = 'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
+ logger.warning(msg)
+ warnings.warn(msg)
logger.debug(f'Logging format specification is "{fmt}"')
if config.config['logging_debug_threads']:
logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)')
import sys
from types import SimpleNamespace
from typing import Any, List, NamedTuple, Optional, Set, Tuple
+import warnings
import numpy as np
from sklearn.model_selection import train_test_split # type:ignore
if self.spec.delete_bad_inputs:
msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING."
- print(msg, file=sys.stderr)
logger.warning(msg)
+ warnings.warn(msg)
os.remove(filename)
else:
msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping."
- print(msg, file=sys.stderr)
logger.warning(msg)
+ warnings.warn(msg)
return (X, y)
def make_progress_graph(self) -> None:
import logging
import os
from typing import Callable, List, NamedTuple, Optional, Set
+import warnings
import argparse_utils
import config
continue
features = in_spec.image_file_to_features_file(image)
if features is None or not os.path.exists(features):
- logger.warning(
- f'File {image} yielded file {features} which does not exist, SKIPPING.'
- )
+ msg = f'File {image} yielded file {features} which does not exist, SKIPPING.'
+ logger.warning(msg)
+ warnings.warn(msg)
continue
# Render features and image.
import functools
import logging
from typing import Any
+import warnings
import file_utils
logger.debug(f'Attempting to load {cls.__name__} from persisted state.')
self.instance = cls.load()
if not self.instance:
- logger.warning('Loading from cache failed.')
+ msg = 'Loading from cache failed.'
+ logger.warning(msg)
+ warnings.warn(msg)
logger.debug(f'Attempting to instantiate {cls.__name__} directly.')
self.instance = cls(*args, **kwargs)
else:
@background_thread
def watch_for_cancel(terminate_event: threading.Event) -> None:
- if platform.node() == 'VIDEO-COMPUTER':
- logger.warning('Background thread not allowed on retarded computers, sorry.')
- return
logger.debug('Starting up background thread...')
p = psutil.Process(os.getpid())
while True:
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple
+import warnings
from overrides import overrides
import tinytuya as tt
result = os.system(command)
signal = result & 0xFF
if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
+ msg = f'{command} died with signal {signal}'
+ logger.warning(msg)
+ warnings.warn(msg)
+ logging_utils.hlog(msg)
return False
else:
exit_value = result >> 8
if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ msg = f'{command} failed, exited {exit_value}'
+ logger.warning(msg)
+ warnings.warn(msg)
+ logging_utils.hlog(msg)
return False
logger.debug(f'{command} succeeded.')
return True
import subprocess
import sys
from typing import Any, Dict, List, Optional
+import warnings
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
result = os.system(command)
signal = result & 0xFF
if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
+ msg = f'{command} died with signal {signal}'
+ logger.warning(msg)
+ warnings.warn(msg)
+ logging_utils.hlog(msg)
return False
else:
exit_value = result >> 8
if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ msg = f'{command} failed, exited {exit_value}'
+ logger.warning(msg)
+ warnings.warn(msg)
+ logging_utils.hlog(msg)
return False
logger.debug(f'{command} succeeded.')
return True
import logging
import re
from typing import List, Optional, Set
+import warnings
import argparse_utils
import config
try:
(mac, name, keywords) = line.split(",")
except ValueError:
- logger.warning(f'SH-CONFIG> {line} is malformed?!')
+ msg = f'SH-CONFIG> {line} is malformed?!'
+ logger.warning(msg)
+ warnings.warn(msg)
continue
mac = mac.strip()
name = name.strip()
logger.debug(' ...an unknown device (should this be here?)')
return device.Device(name, mac, kws)
except Exception as e:
- logger.warning(
- f'Got exception {e} while trying to communicate with device {name}/{mac}.'
- )
+ logger.exception(e)
return device.Device(name, mac, kws)
- logger.warning(f'{mac} is not a known smart home device, returning None')
+ msg = f'{mac} is not a known smart home device, returning None'
+ logger.warning(msg)
+ warnings.warn(msg)
return None
def query(self, query: str) -> List[device.Device]:
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple
import unicodedata
from uuid import uuid4
+import warnings
import list_utils
d.parse(in_str)
return d.get_date()
except dp.ParseException:
- logger.warning(f'Unable to parse date {in_str}.')
+ msg = f'Unable to parse date {in_str}.'
+ logger.warning(msg)
+ warnings.warn(msg)
return None
_ = d.parse(in_str)
return True
except dp.ParseException:
- logger.warning(f'Unable to parse date {in_str}.')
+ msg = f'Unable to parse date {in_str}.'
+ logger.warning(msg)
+ warnings.warn(msg)
return False
if type(dt) == datetime.datetime:
return dt
except ValueError:
- logger.warning(f'Unable to parse datetime {in_str}.')
+ msg = f'Unable to parse datetime {in_str}.'
+ logger.warning(msg)
+ warnings.warn(msg)
return None
_ = to_datetime(in_str)
if _ is not None:
return True
- logger.warning(f'Unable to parse datetime {in_str}.')
+ msg = f'Unable to parse datetime {in_str}.'
+ logger.warning(msg)
+ warnings.warn(msg)
return False
"""
if len(txt) % chunk_size != 0:
- logger.warning(
- f'String to chunk\'s length ({len(txt)} is not an even multiple of chunk_size ({chunk_size})')
+ msg = f'String to chunk\'s length ({len(txt)} is not an even multiple of chunk_size ({chunk_size})'
+ logger.warning(msg)
+ warnings.warn(msg)
for x in range(0, len(txt), chunk_size):
yield txt[x:x+chunk_size]
import tempfile
from typing import Callable
import unittest
+import warnings
import bootstrap
import config
perfdb = load_known_test_performance_characteristics()
except Exception as e:
logger.exception(e)
- logger.warning(f'Unable to load perfdb from {_db}')
+ msg = f'Unable to load perfdb from {_db}'
+ logger.warning(msg)
+ warnings.warn(msg)
perfdb = {}
# This is a unique identifier for a test: filepath!function