From: Scott Gasch Date: Sun, 31 Oct 2021 20:08:51 +0000 (-0700) Subject: Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc... X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=eb9e6df32ed696158bf34dba6464277b648f5c74;p=python_utils.git Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc... --- diff --git a/acl.py b/acl.py index f810b41..2b34767 100644 --- a/acl.py +++ b/acl.py @@ -7,6 +7,8 @@ import logging import re from typing import Any, Callable, List, Optional, Set, Sequence +from overrides import overrides + # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -93,11 +95,13 @@ class SetBasedACL(SimpleACL): self.allow_set = allow_set self.deny_set = deny_set + @overrides def check_allowed(self, x: Any) -> bool: if self.allow_set is None: return False return x in self.allow_set + @overrides def check_denied(self, x: Any) -> bool: if self.deny_set is None: return False @@ -158,11 +162,13 @@ class PredicateListBasedACL(SimpleACL): self.allow_predicate_list = allow_predicate_list self.deny_predicate_list = deny_predicate_list + @overrides def check_allowed(self, x: Any) -> bool: if self.allow_predicate_list is None: return False return any(predicate(x) for predicate in self.allow_predicate_list) + @overrides def check_denied(self, x: Any) -> bool: if self.deny_predicate_list is None: return False @@ -242,11 +248,13 @@ class AnyCompoundACL(SimpleACL): ) self.subacls = subacls + @overrides def check_allowed(self, x: Any) -> bool: if self.subacls is None: return False return any(acl(x) for acl in self.subacls) + @overrides def check_denied(self, x: Any) -> bool: if self.subacls is None: return False @@ -266,11 +274,13 @@ class AllCompoundACL(SimpleACL): ) self.subacls = subacls + @overrides def check_allowed(self, x: Any) -> bool: if self.subacls is None: return False return all(acl(x) for acl in self.subacls) + @overrides def check_denied(self, x: Any) -> bool: if self.subacls is None: return False diff --git a/ansi.py b/ansi.py index 4c09db3..d30ae27 100755 --- a/ansi.py +++ b/ansi.py @@ -8,6 +8,8 @@ import re import sys from typing import Any, Callable, Dict, Iterable, Optional, Tuple +from overrides import overrides + import logging_utils logger = logging.getLogger(__name__) @@ -1864,7 +1866,7 @@ class StdoutInterceptor(io.TextIOBase): self.buf = '' @abstractmethod - def write(self, s): + def write(self, s: str): pass def __enter__(self) -> None: @@ -1883,6 +1885,7 @@ class ProgrammableColorizer(StdoutInterceptor): super().__init__() self.patterns = [_ for _ in patterns] + @overrides def write(self, s: str): for pattern in self.patterns: s = pattern[0].sub(pattern[1], s) diff --git a/argparse_utils.py b/argparse_utils.py index 530690c..e8c2f56 100644 --- a/argparse_utils.py +++ b/argparse_utils.py @@ -6,6 +6,9 @@ import logging import os from typing import Any +from overrides import overrides + + # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -47,6 +50,7 @@ class ActionNoYes(argparse.Action): help=help ) + @overrides def __call__(self, parser, namespace, values, option_strings=None): if ( option_strings.startswith('--no-') or diff --git a/arper.py b/arper.py index d74b659..2fc0767 100644 --- a/arper.py +++ b/arper.py @@ -1,16 +1,22 @@ #!/usr/bin/env python3 +"""A caching layer around the kernel's network mapping between IPs and MACs""" + import datetime import logging import os from typing import Any, Optional +from overrides import overrides + import argparse_utils from collect.bidict import BiDict import config import exec_utils +import file_utils import persistent import string_utils +import site_config logger = logging.getLogger(__name__) @@ -31,6 +37,12 @@ cfg.add_argument( metavar='DURATION', help='Max acceptable age of the kernel arp table cache' ) +cfg.add_argument( + '--arper_min_entries_to_be_valid', + type=int, + default=site_config.get_config().arper_minimum_device_count, + help='Min number of arp entries to bother persisting.' +) @persistent.persistent_autoloaded_singleton() @@ -44,17 +56,20 @@ class Arper(persistent.Persistent): self.state = cached_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') - output = exec_utils.cmd( - '/usr/sbin/arp -a', - timeout_seconds=5.0 - ) - for line in output.split('\n'): - ip = string_utils.extract_ip_v4(line) - mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None: - mac = mac.lower() - logger.debug(f' {mac} => {ip}') - self.state[mac] = ip + self.update() + + def update(self): + output = exec_utils.cmd( + '/usr/sbin/arp -a', + timeout_seconds=5.0 + ) + for line in output.split('\n'): + ip = string_utils.extract_ip_v4(line) + mac = string_utils.extract_mac_address(line) + if ip is not None and mac is not None: + mac = mac.lower() + logger.debug(f' {mac} => {ip}') + self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: mac = mac.lower() @@ -63,34 +78,50 @@ class Arper(persistent.Persistent): def get_mac_by_ip(self, ip: str) -> Optional[str]: return self.state.inverse.get(ip, None) + @overrides def save(self) -> bool: - logger.debug( - f'Persisting state to {config.config["arper_cache_location"]}' - ) - with open(config.config['arper_cache_location'], 'w') as wf: - for (mac, ip) in self.state.items(): - mac = mac.lower() - print(f'{mac}, {ip}', file=wf) + if len(self.state) > config.config['arper_min_entries_to_be_valid']: + logger.debug( + f'Persisting state to {config.config["arper_cache_location"]}' + ) + with file_utils.FileWriter(config.config['arper_cache_location']) as wf: + for (mac, ip) in self.state.items(): + mac = mac.lower() + print(f'{mac}, {ip}', file=wf) + return True + else: + logger.warning( + f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.' + ) + return False @classmethod + @overrides def load(cls) -> Any: + cache_file = config.config['arper_cache_location'] if persistent.was_file_written_within_n_seconds( - config.config['arper_cache_location'], + cache_file, config.config['arper_cache_max_staleness'].total_seconds(), ): - logger.debug( - f'Loading state from {config.config["arper_cache_location"]}' - ) + logger.debug(f'Loading state from {cache_file}') cached_state = BiDict() - with open(config.config['arper_cache_location'], 'r') as rf: + with open(cache_file, 'r') as rf: contents = rf.readlines() for line in contents: - logger.debug(f'ARPER> {line}') + line = line[:-1] + logger.debug(f'ARPER:{cache_file}> {line}') (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() ip = ip.strip() cached_state[mac] = ip - return cls(cached_state) + if len(cached_state) > config.config['arper_min_entries_to_be_valid']: + return cls(cached_state) + else: + logger.warning( + f'{cache_file} sucks, only {len(cached_state)} entries. Deleting it.' + ) + os.remove(cache_file) + logger.debug('No usable saved state found') return None diff --git a/config.py b/config.py index b6262ad..ea5f68a 100644 --- a/config.py +++ b/config.py @@ -96,11 +96,13 @@ args = argparse.ArgumentParser( # than once. config_parse_called = False + # A global configuration dictionary that will contain parsed arguments. # It is also this variable that modules use to access parsed arguments. # This is the data that is most interesting to our callers; it will hold # the configuration result. -config: Dict[str, Any] = {} +config = {} +# It would be really nice if this shit worked from interactive python def add_commandline_args(title: str, description: str = ""): diff --git a/executors.py b/executors.py index 3cb0a91..d5049a2 100644 --- a/executors.py +++ b/executors.py @@ -17,6 +17,7 @@ import time from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore +from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils @@ -121,6 +122,7 @@ class ThreadExecutor(BaseExecutor): self.histogram.add_item(duration) return result + @overrides def submit(self, function: Callable, *args, @@ -135,6 +137,7 @@ class ThreadExecutor(BaseExecutor): *newargs, **kwargs) + @overrides def shutdown(self, wait = True) -> None: logger.debug(f'Shutting down threadpool executor {self.title}') @@ -163,6 +166,7 @@ class ProcessExecutor(BaseExecutor): self.adjust_task_count(-1) return result + @overrides def submit(self, function: Callable, *args, @@ -181,6 +185,7 @@ class ProcessExecutor(BaseExecutor): ) return result + @overrides def shutdown(self, wait=True) -> None: logger.debug(f'Shutting down processpool executor {self.title}') self._process_executor.shutdown(wait) @@ -813,6 +818,7 @@ class RemoteExecutor(BaseExecutor): # they will move the result_file to this machine and let # the original pick them up and unpickle them. + @overrides def submit(self, function: Callable, *args, @@ -822,6 +828,7 @@ class RemoteExecutor(BaseExecutor): self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) + @overrides def shutdown(self, wait=True) -> None: self._helper_executor.shutdown(wait) logging.debug(f'Shutting down RemoteExecutor {self.title}') diff --git a/file_utils.py b/file_utils.py index 7270e30..67e6f56 100644 --- a/file_utils.py +++ b/file_utils.py @@ -19,6 +19,9 @@ from uuid import uuid4 logger = logging.getLogger(__name__) +# os.remove(file) you fuckwit. + + def create_path_if_not_exist(path, on_error=None): """ Attempts to create path if it does not exist. If on_error is diff --git a/letter_compress.py b/letter_compress.py index 9a6f32d..b5d3264 100644 --- a/letter_compress.py +++ b/letter_compress.py @@ -2,9 +2,9 @@ import bitstring -from collect.bidict import bidict +from collect.bidict import BiDict -special_characters = bidict( +special_characters = BiDict( { ' ': 27, '.': 28, diff --git a/logging_utils.py b/logging_utils.py index 183e1f0..eec0798 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -15,6 +15,8 @@ import random import sys from typing import Callable, Iterable, Mapping, Optional +from overrides import overrides + # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import argparse_utils @@ -36,7 +38,7 @@ cfg.add_argument( default='INFO', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], metavar='LEVEL', - help='The level below which to squelch log messages.', + help='The global default level below which to squelch log messages; see also --lmodule', ) cfg.add_argument( '--logging_format', @@ -115,6 +117,17 @@ cfg.add_argument( default=False, help='When calling print, also log.info automatically.' ) +cfg.add_argument( + '--lmodule', + type=str, + metavar='=[,=...]', + help=( + 'Allows per-scope logging levels which override the global level set with --logging-level.' + + 'Pass a space separated list of = where is one of: module, ' + + 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' + ) +) + built_in_print = print @@ -181,6 +194,7 @@ class SquelchRepeatedMessagesFilter(logging.Filter): self.counters = collections.Counter() super().__init__() + @overrides def filter(self, record: logging.LogRecord) -> bool: id1 = f'{record.module}:{record.funcName}' if id1 not in squelched_logging_counts: @@ -192,6 +206,84 @@ class SquelchRepeatedMessagesFilter(logging.Filter): return count < threshold +class DynamicPerScopeLoggingLevelFilter(logging.Filter): + """Only interested in seeing logging messages from an allow list of + module names or module:function names. Block others. + + """ + @staticmethod + def level_name_to_level(name: str) -> int: + numeric_level = getattr( + logging, + name, + None + ) + if not isinstance(numeric_level, int): + raise ValueError('Invalid level: {name}') + return numeric_level + + def __init__( + self, + default_logging_level: int, + per_scope_logging_levels: str, + ) -> None: + super().__init__() + self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) + self.default_logging_level = default_logging_level + self.level_by_scope = {} + if per_scope_logging_levels is not None: + for chunk in per_scope_logging_levels.split(','): + if '=' not in chunk: + print( + f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.', + file=sys.stderr + ) + continue + try: + (scope, level) = chunk.split('=') + except ValueError: + print( + f'Malformed lmodule directive: "{chunk}". Ignored.', + file=sys.stderr + ) + continue + scope = scope.strip() + level = level.strip().upper() + if level not in self.valid_levels: + print( + f'Malformed lmodule directive: "{chunk}", bad level. Ignored.', + file=sys.stderr + ) + continue + self.level_by_scope[scope] = ( + DynamicPerScopeLoggingLevelFilter.level_name_to_level( + level + ) + ) + + @overrides + def filter(self, record: logging.LogRecord) -> bool: + # First try to find a logging level by scope (--lmodule) + if len(self.level_by_scope) > 0: + min_level = None + for scope in ( + record.module, + f'{record.module}:{record.funcName}', + f':{record.funcName}' + ): + level = self.level_by_scope.get(scope, None) + if level is not None: + if min_level is None or level < min_level: + min_level = level + + # If we found one, use it instead of the global default level. + if min_level is not None: + return record.levelno >= min_level + + # Otherwise, use the global logging level (--logging_level) + return record.levelno >= self.default_logging_level + + # A map from function_identifier -> probability of logging (0.0%..100.0%) probabilistic_logging_levels: Mapping[str, float] = {} @@ -222,6 +314,7 @@ class ProbabilisticFilter(logging.Filter): been tagged with the @logging_utils.probabilistic_logging decorator. """ + @overrides def filter(self, record: logging.LogRecord) -> bool: id1 = f'{record.module}:{record.funcName}' if id1 not in probabilistic_logging_levels: @@ -238,7 +331,8 @@ class OnlyInfoFilter(logging.Filter): stdout handler. """ - def filter(self, record): + @overrides + def filter(self, record: logging.LogRecord): return record.levelno == logging.INFO @@ -249,6 +343,7 @@ class MillisecondAwareFormatter(logging.Formatter): """ converter = datetime.datetime.fromtimestamp + @overrides def formatTime(self, record, datefmt=None): ct = MillisecondAwareFormatter.converter( record.created, pytz.timezone("US/Pacific") @@ -271,12 +366,14 @@ def initialize_logging(logger=None) -> logging.Logger: return logger handlers = [] - numeric_level = getattr( + + # Global default logging level (--logging_level) + default_logging_level = getattr( logging, config.config['logging_level'].upper(), None ) - if not isinstance(numeric_level, int): + if not isinstance(default_logging_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) fmt = config.config['logging_format'] @@ -286,15 +383,12 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_syslog']: if sys.platform not in ('win32', 'cygwin'): handler = SysLogHandler() -# for k, v in encoded_priorities.items(): -# handler.encodePriority(k, v) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) - handler.setLevel(numeric_level) handlers.append(handler) if config.config['logging_filename']: @@ -303,7 +397,6 @@ def initialize_logging(logger=None) -> logging.Logger: maxBytes = config.config['logging_filename_maxsize'], backupCount = config.config['logging_filename_count'], ) - handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, @@ -314,7 +407,6 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_console']: handler = logging.StreamHandler(sys.stderr) - handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, @@ -342,7 +434,14 @@ def initialize_logging(logger=None) -> logging.Logger: for handler in handlers: handler.addFilter(ProbabilisticFilter()) - logger.setLevel(numeric_level) + for handler in handlers: + handler.addFilter( + DynamicPerScopeLoggingLevelFilter( + default_logging_level, + config.config['lmodule'], + ) + ) + logger.setLevel(0) logger.propagate = False if config.config['logging_captures_prints']: diff --git a/math_utils.py b/math_utils.py index fa0bc0e..e0e3f6c 100644 --- a/math_utils.py +++ b/math_utils.py @@ -6,7 +6,7 @@ from typing import List from heapq import heappush, heappop -class RunningMedian: +class RunningMedian(object): """A running median computer. >>> median = RunningMedian() diff --git a/presence.py b/presence.py index e5bc64f..b310183 100755 --- a/presence.py +++ b/presence.py @@ -7,6 +7,7 @@ import logging import re from typing import Dict, List +# Note: this module is fairly early loaded. Be aware of dependencies. import argparse_utils import bootstrap import config diff --git a/site_config.py b/site_config.py index 3327312..e3b186d 100644 --- a/site_config.py +++ b/site_config.py @@ -3,8 +3,9 @@ from dataclasses import dataclass import logging import platform -from typing import Callable, Optional +from typing import Callable +# Note: this module is fairly early loaded. Be aware of dependencies. import config import presence @@ -31,6 +32,7 @@ class SiteConfig(object): network_router_ip: str presence_location: presence.Location is_anyone_present: Callable[None, bool] + arper_minimum_device_count: int def get_location(): @@ -78,6 +80,7 @@ def get_config(): network_router_ip = '10.0.0.1', presence_location = presence.Location.HOUSE, is_anyone_present = lambda x=presence.Location.HOUSE: is_anyone_present_wrapper(x), + arper_minimum_device_count = 50, ) elif location == 'CABIN': return SiteConfig( @@ -87,6 +90,7 @@ def get_config(): network_router_ip = '192.168.0.1', presence_location = presence.Location.CABIN, is_anyone_present = lambda x=presence.Location.CABIN: is_anyone_present_wrapper(x), + arper_minimum_device_count = 15, ) else: raise Exception(f'Unknown site location: {location}') diff --git a/smart_future.py b/smart_future.py index 7dbec50..f11be17 100644 --- a/smart_future.py +++ b/smart_future.py @@ -6,6 +6,8 @@ import concurrent.futures as fut import time from typing import Callable, List, TypeVar +from overrides import overrides + # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. from deferred_operand import DeferredOperand @@ -66,5 +68,6 @@ class SmartFuture(DeferredOperand): # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. + @overrides def _resolve(self, *, timeout=None) -> T: return self.wrapped_future.result(timeout) diff --git a/smart_home/chromecasts.py b/smart_home/chromecasts.py new file mode 100644 index 0000000..08290e5 --- /dev/null +++ b/smart_home/chromecasts.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +"""Utilities for dealing with the webcams.""" + +import logging +import time + +import pychromecast + +from decorator_utils import memoized +import smart_home.device as dev + +logger = logging.getLogger(__name__) + + +class BaseChromecast(dev.Device): + def __init__(self, name: str, mac: str, keywords: str = "") -> None: + super().__init__(name.strip(), mac.strip(), keywords) + ip = self.get_ip() + self.cast = pychromecast.Chromecast(ip) + self.cast.wait() + time.sleep(0.1) + + def is_idle(self): + return self.cast.is_idle + + @memoized + def get_uuid(self): + return self.cast.uuid + + @memoized + def get_friendly_name(self): + return self.cast.name + + def get_uri(self): + return self.cast.url + + @memoized + def get_model_name(self): + return self.cast.model_name + + @memoized + def get_cast_type(self): + return self.cast.cast_type + + @memoized + def app_id(self): + return self.cast.app_id + + def get_app_display_name(self): + return self.cast.app_display_name + + def get_media_controller(self): + return self.cast.media_controller + + def status(self): + if self.is_idle(): + return 'idle' + app = self.get_app_display_name() + mc = self.get_media_controller() + status = mc.status + return f'{app} / {status.title}' + + def start_app(self, app_id, force_launch=False): + """Start an app on the Chromecast.""" + self.cast.start_app(app_id, force_launch) + + def quit_app(self): + """Tells the Chromecast to quit current app_id.""" + self.cast.quit_app() + + def volume_up(self, delta=0.1): + """Increment volume by 0.1 (or delta) unless it is already maxed. + Returns the new volume. + """ + return self.cast.volume_up(delta) + + def volume_down(self, delta=0.1): + """Decrement the volume by 0.1 (or delta) unless it is already 0. + Returns the new volume. + """ + return self.cast.volume_down(delta) + + def __repr__(self): + return ( + f"Chromecast({self.cast.socket_client.host!r}, port={self.cast.socket_client.port!r}, " + f"device={self.cast.device!r})" + ) diff --git a/smart_home/device.py b/smart_home/device.py index 04b0bfe..9675b7c 100644 --- a/smart_home/device.py +++ b/smart_home/device.py @@ -28,9 +28,20 @@ class Device(object): def get_mac(self) -> str: return self.mac - def get_ip(self) -> str: + def get_ip(self) -> Optional[str]: return self.arper.get_ip_by_mac(self.mac) + def has_static_ip(self) -> bool: + for kw in self.kws: + m = re.search(r'static:([\d\.]+)', kw) + if m is not None: + ip = m.group(1) + assert self.get_ip() == ip + return True + return False + + # Add command -> URL logic here. + def get_keywords(self) -> Optional[List[str]]: return self.kws diff --git a/smart_home/lights.py b/smart_home/lights.py index 5446722..76b1500 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -10,11 +10,14 @@ import os import re import subprocess import sys -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple +from overrides import overrides import tinytuya as tt +import ansi import argparse_utils +import arper import config import logging_utils import smart_home.device as dev @@ -23,11 +26,11 @@ from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) -parser = config.add_commandline_args( +args = config.add_commandline_args( f"Smart Lights ({__file__})", "Args related to smart lights.", ) -parser.add_argument( +args.add_argument( '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', @@ -60,6 +63,20 @@ class BaseLight(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) + @staticmethod + def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]: + m = re.match( + 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])', + color + ) + if m is not None and len(m.group) == 3: + red = int(m.group(0), 16) + green = int(m.group(1), 16) + blue = int(m.group(2), 16) + return (red, green, blue) + color = color.lower() + return ansi.COLOR_NAMES_TO_RGB.get(color, None) + @abstractmethod def turn_on(self) -> bool: pass @@ -101,25 +118,30 @@ class GoogleLight(BaseLight): def parse_google_response(response: GoogleResponse) -> bool: return response.success + @overrides def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) + @overrides def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) + @overrides def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -136,6 +158,7 @@ class GoogleLight(BaseLight): return 0 return None + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False @@ -149,6 +172,7 @@ class GoogleLight(BaseLight): return True return False + @overrides def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") @@ -174,50 +198,55 @@ class TuyaLight(BaseLight): } def __init__(self, name: str, mac: str, keywords: str = "") -> None: - from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] - try: - pid = Popen(['maclookup', mac], stdout=PIPE) - ip = pid.communicate()[0] - ip = ip[:-1] - except Exception: - ip = '0.0.0.0' + self.arper = arper.Arper() + ip = self.get_ip() self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) + def get_status(self) -> Dict[str, Any]: + return self.bulb.status() + + @overrides def turn_on(self) -> bool: self.bulb.turn_on() return True + @overrides def turn_off(self) -> bool: self.bulb.turn_off() return True - def get_status(self) -> Dict[str, Any]: - return self.bulb.status() - + @overrides def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] + @overrides def set_dimmer_level(self, level: int) -> bool: self.bulb.set_brightness(level) return True + @overrides def make_color(self, color: str) -> bool: - self.bulb.set_colour(255,0,0) - return True + rgb = BaseLight.parse_color_string(color) + if rgb is not None: + self.bulb.set_colour(rgb[0], rgb[1], rgb[2]) + return True + return False class TPLinkLight(BaseLight): @@ -260,18 +289,23 @@ class TPLinkLight(BaseLight): logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) + @overrides def turn_on(self, child: str = None) -> bool: return self.command("on", child) + @overrides def turn_off(self, child: str = None) -> bool: return self.command("off", child) + @overrides def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def make_color(self, color: str) -> bool: raise NotImplementedError @@ -308,13 +342,7 @@ class TPLinkLight(BaseLight): return int(chi.get("on_time", "0")) return 0 - def get_on_limit_seconds(self) -> Optional[int]: - for kw in self.kws: - m = re.search(r"timeout:(\d+)", kw) - if m is not None: - return int(m.group(1)) * 60 - return None - + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -323,6 +351,7 @@ class TPLinkLight(BaseLight): return None return int(self.info.get("brightness", "0")) + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False diff --git a/smart_home/config.py b/smart_home/registry.py similarity index 62% rename from smart_home/config.py rename to smart_home/registry.py index a28caa7..2d23981 100644 --- a/smart_home/config.py +++ b/smart_home/registry.py @@ -10,15 +10,16 @@ import file_utils import logical_search import smart_home.device as device import smart_home.cameras as cameras +import smart_home.chromecasts as chromecasts import smart_home.lights as lights import smart_home.outlets as outlets -parser = config.add_commandline_args( - f"Smart Home Config ({__file__})", - "Args related to the smart home config." +args = config.add_commandline_args( + f"Smart Home Registry ({__file__})", + "Args related to the smart home configuration registry." ) -parser.add_argument( - '--smart_home_config_file_location', +args.add_argument( + '--smart_home_registry_file_location', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='The location of network_mac_addresses.txt', @@ -29,10 +30,10 @@ parser.add_argument( logger = logging.getLogger(__file__) -class SmartHomeConfig(object): +class SmartHomeRegistry(object): def __init__( self, - config_file: Optional[str] = None, + registry_file: Optional[str] = None, filters: List[str] = ['smart'], ) -> None: self._macs_by_name = {} @@ -42,13 +43,13 @@ class SmartHomeConfig(object): self._corpus = logical_search.Corpus() # Read the disk config file... - if config_file is None: - config_file = config.config[ - 'smart_home_config_file_location' + if registry_file is None: + registry_file = config.config[ + 'smart_home_registry_file_location' ] - assert file_utils.does_file_exist(config_file) - logger.debug(f'Reading {config_file}') - with open(config_file, "r") as f: + assert file_utils.does_file_exist(registry_file) + logger.debug(f'Reading {registry_file}') + with open(registry_file, "r") as f: contents = f.readlines() # Parse the contents... @@ -58,7 +59,7 @@ class SmartHomeConfig(object): line = line.strip() if line == "": continue - logger.debug(f'> {line}') + logger.debug(f'SH-CONFIG> {line}') (mac, name, keywords) = line.split(",") mac = mac.strip() name = name.strip() @@ -139,38 +140,47 @@ class SmartHomeConfig(object): name = self._names_by_mac[mac] kws = self._keywords_by_mac[mac] logger.debug(f'Found {name} -> {mac} ({kws})') - if 'light' in kws.lower(): - if 'tplink' in kws.lower(): - logger.debug(' ...a TPLinkLight') - return lights.TPLinkLight(name, mac, kws) - elif 'tuya' in kws.lower(): - logger.debug(' ...a TuyaLight') - return lights.TuyaLight(name, mac, kws) - elif 'goog' in kws.lower(): - logger.debug(' ...a GoogleLight') - return lights.GoogleLight(name, mac, kws) - else: - raise Exception(f'Unknown light device: {name}, {mac}, {kws}') - elif 'outlet' in kws.lower(): - if 'tplink' in kws.lower(): - if 'children' in kws.lower(): - logger.debug(' ...a TPLinkOutletWithChildren') - return outlets.TPLinkOutletWithChildren(name, mac, kws) + try: + if 'light' in kws.lower(): + if 'tplink' in kws.lower(): + logger.debug(' ...a TPLinkLight') + return lights.TPLinkLight(name, mac, kws) + elif 'tuya' in kws.lower(): + logger.debug(' ...a TuyaLight') + return lights.TuyaLight(name, mac, kws) + elif 'goog' in kws.lower(): + logger.debug(' ...a GoogleLight') + return lights.GoogleLight(name, mac, kws) + else: + raise Exception(f'Unknown light device: {name}, {mac}, {kws}') + elif 'outlet' in kws.lower(): + if 'tplink' in kws.lower(): + if 'children' in kws.lower(): + logger.debug(' ...a TPLinkOutletWithChildren') + return outlets.TPLinkOutletWithChildren(name, mac, kws) + else: + logger.debug(' ...a TPLinkOutlet') + return outlets.TPLinkOutlet(name, mac, kws) + elif 'goog' in kws.lower(): + logger.debug(' ...a GoogleOutlet') + return outlets.GoogleOutlet(name, mac, kws) else: - logger.debug(' ...a TPLinkOutlet') - return outlets.TPLinkOutlet(name, mac, kws) - elif 'goog' in kws.lower(): - logger.debug(' ...a GoogleOutlet') - return outlets.GoogleOutlet(name, mac, kws) + raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}') + elif 'camera' in kws.lower(): + logger.debug(' ...a BaseCamera') + return cameras.BaseCamera(name, mac, kws) + elif 'ccast' in kws.lower(): + logger.debug(' ...a Chromecast') + return chromecasts.BaseChromecast(name, mac, kws) else: - raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}') - elif 'camera' in kws.lower(): - logger.debug(' ...a BaseCamera') - return cameras.BaseCamera(name, mac, kws) - else: - logger.debug(' ...an unknown device (should this be here?)') + logger.debug(' ...an unknown device (should this be here?)') + return device.Device(name, mac, kws) + except Exception as e: + logger.warning( + f'Got exception {e} while trying to communicate with device {name}/{mac}.' + ) return device.Device(name, mac, kws) - logger.warning(f'{mac} is not known, returning None') + logger.warning(f'{mac} is not a known smart home device, returning None') return None def query(self, query: str) -> List[device.Device]: diff --git a/text_utils.py b/text_utils.py index 49ff9b9..36cfe2f 100644 --- a/text_utils.py +++ b/text_utils.py @@ -232,7 +232,7 @@ def wrap_string(text: str, n: int) -> str: return out -class Indenter: +class Indenter(object): """ with Indenter(pad_count = 8) as i: i.print('test')