Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc...
authorScott Gasch <[email protected]>
Sun, 31 Oct 2021 20:08:51 +0000 (13:08 -0700)
committerScott Gasch <[email protected]>
Sun, 31 Oct 2021 20:08:51 +0000 (13:08 -0700)
18 files changed:
acl.py
ansi.py
argparse_utils.py
arper.py
config.py
executors.py
file_utils.py
letter_compress.py
logging_utils.py
math_utils.py
presence.py
site_config.py
smart_future.py
smart_home/chromecasts.py [new file with mode: 0644]
smart_home/device.py
smart_home/lights.py
smart_home/registry.py [moved from smart_home/config.py with 62% similarity]
text_utils.py

diff --git a/acl.py b/acl.py
index f810b418e635b1ca4dcbdc6df8f894764d92b8c8..2b347673af1b14dbb144ca9179d115a5eba642b0 100644 (file)
--- a/acl.py
+++ b/acl.py
@@ -7,6 +7,8 @@ import logging
 import re
 from typing import Any, Callable, List, Optional, Set, Sequence
 
+from overrides import overrides
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 
@@ -93,11 +95,13 @@ class SetBasedACL(SimpleACL):
         self.allow_set = allow_set
         self.deny_set = deny_set
 
+    @overrides
     def check_allowed(self, x: Any) -> bool:
         if self.allow_set is None:
             return False
         return x in self.allow_set
 
+    @overrides
     def check_denied(self, x: Any) -> bool:
         if self.deny_set is None:
             return False
@@ -158,11 +162,13 @@ class PredicateListBasedACL(SimpleACL):
         self.allow_predicate_list = allow_predicate_list
         self.deny_predicate_list = deny_predicate_list
 
+    @overrides
     def check_allowed(self, x: Any) -> bool:
         if self.allow_predicate_list is None:
             return False
         return any(predicate(x) for predicate in self.allow_predicate_list)
 
+    @overrides
     def check_denied(self, x: Any) -> bool:
         if self.deny_predicate_list is None:
             return False
@@ -242,11 +248,13 @@ class AnyCompoundACL(SimpleACL):
         )
         self.subacls = subacls
 
+    @overrides
     def check_allowed(self, x: Any) -> bool:
         if self.subacls is None:
             return False
         return any(acl(x) for acl in self.subacls)
 
+    @overrides
     def check_denied(self, x: Any) -> bool:
         if self.subacls is None:
             return False
@@ -266,11 +274,13 @@ class AllCompoundACL(SimpleACL):
         )
         self.subacls = subacls
 
+    @overrides
     def check_allowed(self, x: Any) -> bool:
         if self.subacls is None:
             return False
         return all(acl(x) for acl in self.subacls)
 
+    @overrides
     def check_denied(self, x: Any) -> bool:
         if self.subacls is None:
             return False
diff --git a/ansi.py b/ansi.py
index 4c09db3df76c1c66561f097e6c48d48caa62156c..d30ae27e625dbe585d82d3392837b39bf436f888 100755 (executable)
--- a/ansi.py
+++ b/ansi.py
@@ -8,6 +8,8 @@ import re
 import sys
 from typing import Any, Callable, Dict, Iterable, Optional, Tuple
 
+from overrides import overrides
+
 import logging_utils
 
 logger = logging.getLogger(__name__)
@@ -1864,7 +1866,7 @@ class StdoutInterceptor(io.TextIOBase):
         self.buf = ''
 
     @abstractmethod
-    def write(self, s):
+    def write(self, s: str):
         pass
 
     def __enter__(self) -> None:
@@ -1883,6 +1885,7 @@ class ProgrammableColorizer(StdoutInterceptor):
         super().__init__()
         self.patterns = [_ for _ in patterns]
 
+    @overrides
     def write(self, s: str):
         for pattern in self.patterns:
             s = pattern[0].sub(pattern[1], s)
index 530690c0c70161bcd58d65389c724593f20003a2..e8c2f5699bea4bceb67d22127a923099d0d143e5 100644 (file)
@@ -6,6 +6,9 @@ import logging
 import os
 from typing import Any
 
+from overrides import overrides
+
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 
@@ -47,6 +50,7 @@ class ActionNoYes(argparse.Action):
             help=help
         )
 
+    @overrides
     def __call__(self, parser, namespace, values, option_strings=None):
         if (
                 option_strings.startswith('--no-') or
index d74b659b857beb74c12057a30e07ea57955a2b7a..2fc07674eedd0b8206586323ccb03bdea240fb33 100644 (file)
--- a/arper.py
+++ b/arper.py
@@ -1,16 +1,22 @@
 #!/usr/bin/env python3
 
+"""A caching layer around the kernel's network mapping between IPs and MACs"""
+
 import datetime
 import logging
 import os
 from typing import Any, Optional
 
+from overrides import overrides
+
 import argparse_utils
 from collect.bidict import BiDict
 import config
 import exec_utils
+import file_utils
 import persistent
 import string_utils
+import site_config
 
 logger = logging.getLogger(__name__)
 
@@ -31,6 +37,12 @@ cfg.add_argument(
     metavar='DURATION',
     help='Max acceptable age of the kernel arp table cache'
 )
+cfg.add_argument(
+    '--arper_min_entries_to_be_valid',
+    type=int,
+    default=site_config.get_config().arper_minimum_device_count,
+    help='Min number of arp entries to bother persisting.'
+)
 
 
 @persistent.persistent_autoloaded_singleton()
@@ -44,17 +56,20 @@ class Arper(persistent.Persistent):
             self.state = cached_state
         else:
             logger.debug('No usable cached state; calling /usr/sbin/arp')
-            output = exec_utils.cmd(
-                '/usr/sbin/arp -a',
-                timeout_seconds=5.0
-            )
-            for line in output.split('\n'):
-                ip = string_utils.extract_ip_v4(line)
-                mac = string_utils.extract_mac_address(line)
-                if ip is not None and mac is not None:
-                    mac = mac.lower()
-                    logger.debug(f'    {mac} => {ip}')
-                    self.state[mac] = ip
+            self.update()
+
+    def update(self):
+        output = exec_utils.cmd(
+            '/usr/sbin/arp -a',
+            timeout_seconds=5.0
+        )
+        for line in output.split('\n'):
+            ip = string_utils.extract_ip_v4(line)
+            mac = string_utils.extract_mac_address(line)
+            if ip is not None and mac is not None:
+                mac = mac.lower()
+                logger.debug(f'    {mac} => {ip}')
+                self.state[mac] = ip
 
     def get_ip_by_mac(self, mac: str) -> Optional[str]:
         mac = mac.lower()
@@ -63,34 +78,50 @@ class Arper(persistent.Persistent):
     def get_mac_by_ip(self, ip: str) -> Optional[str]:
         return self.state.inverse.get(ip, None)
 
+    @overrides
     def save(self) -> bool:
-        logger.debug(
-            f'Persisting state to {config.config["arper_cache_location"]}'
-        )
-        with open(config.config['arper_cache_location'], 'w') as wf:
-            for (mac, ip) in self.state.items():
-                mac = mac.lower()
-                print(f'{mac}, {ip}', file=wf)
+        if len(self.state) > config.config['arper_min_entries_to_be_valid']:
+            logger.debug(
+                f'Persisting state to {config.config["arper_cache_location"]}'
+            )
+            with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
+                for (mac, ip) in self.state.items():
+                    mac = mac.lower()
+                    print(f'{mac}, {ip}', file=wf)
+            return True
+        else:
+            logger.warning(
+                f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.'
+            )
+            return False
 
     @classmethod
+    @overrides
     def load(cls) -> Any:
+        cache_file = config.config['arper_cache_location']
         if persistent.was_file_written_within_n_seconds(
-                config.config['arper_cache_location'],
+                cache_file,
                 config.config['arper_cache_max_staleness'].total_seconds(),
         ):
-            logger.debug(
-                f'Loading state from {config.config["arper_cache_location"]}'
-            )
+            logger.debug(f'Loading state from {cache_file}')
             cached_state = BiDict()
-            with open(config.config['arper_cache_location'], 'r') as rf:
+            with open(cache_file, 'r') as rf:
                 contents = rf.readlines()
                 for line in contents:
-                    logger.debug(f'ARPER> {line}')
+                    line = line[:-1]
+                    logger.debug(f'ARPER:{cache_file}> {line}')
                     (mac, ip) = line.split(',')
                     mac = mac.strip()
                     mac = mac.lower()
                     ip = ip.strip()
                     cached_state[mac] = ip
-            return cls(cached_state)
+            if len(cached_state) > config.config['arper_min_entries_to_be_valid']:
+                return cls(cached_state)
+            else:
+                logger.warning(
+                    f'{cache_file} sucks, only {len(cached_state)} entries.  Deleting it.'
+                )
+                os.remove(cache_file)
+
         logger.debug('No usable saved state found')
         return None
index b6262adf29b4f64816a793cfe7a692cd85d45d14..ea5f68a296b66ea8946a61e6f85fe1891a0b33a8 100644 (file)
--- a/config.py
+++ b/config.py
@@ -96,11 +96,13 @@ args = argparse.ArgumentParser(
 # than once.
 config_parse_called = False
 
+
 # A global configuration dictionary that will contain parsed arguments.
 # It is also this variable that modules use to access parsed arguments.
 # This is the data that is most interesting to our callers; it will hold
 # the configuration result.
-config: Dict[str, Any] = {}
+config = {}
+# It would be really nice if this shit worked from interactive python
 
 
 def add_commandline_args(title: str, description: str = ""):
index 3cb0a916c080128e63a23600db76c07e93956ec9..d5049a264317c2f764d2068e7108a65d858f7cb2 100644 (file)
@@ -17,6 +17,7 @@ import time
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+from overrides import overrides
 
 from ansi import bg, fg, underline, reset
 import argparse_utils
@@ -121,6 +122,7 @@ class ThreadExecutor(BaseExecutor):
         self.histogram.add_item(duration)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -135,6 +137,7 @@ class ThreadExecutor(BaseExecutor):
             *newargs,
             **kwargs)
 
+    @overrides
     def shutdown(self,
                  wait = True) -> None:
         logger.debug(f'Shutting down threadpool executor {self.title}')
@@ -163,6 +166,7 @@ class ProcessExecutor(BaseExecutor):
         self.adjust_task_count(-1)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -181,6 +185,7 @@ class ProcessExecutor(BaseExecutor):
         )
         return result
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         logger.debug(f'Shutting down processpool executor {self.title}')
         self._process_executor.shutdown(wait)
@@ -813,6 +818,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -822,6 +828,7 @@ class RemoteExecutor(BaseExecutor):
         self.total_bundles_submitted += 1
         return self._helper_executor.submit(self.launch, bundle)
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         self._helper_executor.shutdown(wait)
         logging.debug(f'Shutting down RemoteExecutor {self.title}')
index 7270e30b1fe1513746aaf986e87269968787e842..67e6f561f394e1a99c4ace7bb2f6ddbe733ea011 100644 (file)
@@ -19,6 +19,9 @@ from uuid import uuid4
 logger = logging.getLogger(__name__)
 
 
+# os.remove(file) you fuckwit.
+
+
 def create_path_if_not_exist(path, on_error=None):
     """
     Attempts to create path if it does not exist. If on_error is
index 9a6f32db43fb09762cca71f79cb268a8862e2452..b5d326471e8c9412133e94e07737a7d91496dc41 100644 (file)
@@ -2,9 +2,9 @@
 
 import bitstring
 
-from collect.bidict import bidict
+from collect.bidict import BiDict
 
-special_characters = bidict(
+special_characters = BiDict(
     {
         ' ': 27,
         '.': 28,
index 183e1f0109eb8b8dc243b23e9cceb431b01073e0..eec07984b282b9a0ef1e4adea7ab6951a0abc71e 100644 (file)
@@ -15,6 +15,8 @@ import random
 import sys
 from typing import Callable, Iterable, Mapping, Optional
 
+from overrides import overrides
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 import argparse_utils
@@ -36,7 +38,7 @@ cfg.add_argument(
     default='INFO',
     choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
     metavar='LEVEL',
-    help='The level below which to squelch log messages.',
+    help='The global default level below which to squelch log messages; see also --lmodule',
 )
 cfg.add_argument(
     '--logging_format',
@@ -115,6 +117,17 @@ cfg.add_argument(
     default=False,
     help='When calling print, also log.info automatically.'
 )
+cfg.add_argument(
+    '--lmodule',
+    type=str,
+    metavar='<SCOPE>=<LEVEL>[,<SCOPE>=<LEVEL>...]',
+    help=(
+        'Allows per-scope logging levels which override the global level set with --logging-level.' +
+        'Pass a space separated list of <scope>=<level> where <scope> is one of: module, ' +
+        'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
+    )
+)
+
 
 built_in_print = print
 
@@ -181,6 +194,7 @@ class SquelchRepeatedMessagesFilter(logging.Filter):
         self.counters = collections.Counter()
         super().__init__()
 
+    @overrides
     def filter(self, record: logging.LogRecord) -> bool:
         id1 = f'{record.module}:{record.funcName}'
         if id1 not in squelched_logging_counts:
@@ -192,6 +206,84 @@ class SquelchRepeatedMessagesFilter(logging.Filter):
         return count < threshold
 
 
+class DynamicPerScopeLoggingLevelFilter(logging.Filter):
+    """Only interested in seeing logging messages from an allow list of
+    module names or module:function names.  Block others.
+
+    """
+    @staticmethod
+    def level_name_to_level(name: str) -> int:
+        numeric_level = getattr(
+            logging,
+            name,
+            None
+        )
+        if not isinstance(numeric_level, int):
+            raise ValueError('Invalid level: {name}')
+        return numeric_level
+
+    def __init__(
+            self,
+            default_logging_level: int,
+            per_scope_logging_levels: str,
+    ) -> None:
+        super().__init__()
+        self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
+        self.default_logging_level = default_logging_level
+        self.level_by_scope = {}
+        if per_scope_logging_levels is not None:
+            for chunk in per_scope_logging_levels.split(','):
+                if '=' not in chunk:
+                    print(
+                        f'Malformed lmodule directive: "{chunk}", missing "=".  Ignored.',
+                        file=sys.stderr
+                    )
+                    continue
+                try:
+                    (scope, level) = chunk.split('=')
+                except ValueError:
+                    print(
+                        f'Malformed lmodule directive: "{chunk}".  Ignored.',
+                        file=sys.stderr
+                    )
+                    continue
+                scope = scope.strip()
+                level = level.strip().upper()
+                if level not in self.valid_levels:
+                    print(
+                        f'Malformed lmodule directive: "{chunk}", bad level.  Ignored.',
+                        file=sys.stderr
+                    )
+                    continue
+                self.level_by_scope[scope] = (
+                    DynamicPerScopeLoggingLevelFilter.level_name_to_level(
+                        level
+                    )
+                )
+
+    @overrides
+    def filter(self, record: logging.LogRecord) -> bool:
+        # First try to find a logging level by scope (--lmodule)
+        if len(self.level_by_scope) > 0:
+            min_level = None
+            for scope in (
+                    record.module,
+                    f'{record.module}:{record.funcName}',
+                    f':{record.funcName}'
+            ):
+                level = self.level_by_scope.get(scope, None)
+                if level is not None:
+                    if min_level is None or level < min_level:
+                        min_level = level
+
+            # If we found one, use it instead of the global default level.
+            if min_level is not None:
+                return record.levelno >= min_level
+
+        # Otherwise, use the global logging level (--logging_level)
+        return record.levelno >= self.default_logging_level
+
+
 # A map from function_identifier -> probability of logging (0.0%..100.0%)
 probabilistic_logging_levels: Mapping[str, float] = {}
 
@@ -222,6 +314,7 @@ class ProbabilisticFilter(logging.Filter):
     been tagged with the @logging_utils.probabilistic_logging decorator.
 
     """
+    @overrides
     def filter(self, record: logging.LogRecord) -> bool:
         id1 = f'{record.module}:{record.funcName}'
         if id1 not in probabilistic_logging_levels:
@@ -238,7 +331,8 @@ class OnlyInfoFilter(logging.Filter):
     stdout handler.
 
     """
-    def filter(self, record):
+    @overrides
+    def filter(self, record: logging.LogRecord):
         return record.levelno == logging.INFO
 
 
@@ -249,6 +343,7 @@ class MillisecondAwareFormatter(logging.Formatter):
     """
     converter = datetime.datetime.fromtimestamp
 
+    @overrides
     def formatTime(self, record, datefmt=None):
         ct = MillisecondAwareFormatter.converter(
             record.created, pytz.timezone("US/Pacific")
@@ -271,12 +366,14 @@ def initialize_logging(logger=None) -> logging.Logger:
         return logger
 
     handlers = []
-    numeric_level = getattr(
+
+    # Global default logging level (--logging_level)
+    default_logging_level = getattr(
         logging,
         config.config['logging_level'].upper(),
         None
     )
-    if not isinstance(numeric_level, int):
+    if not isinstance(default_logging_level, int):
         raise ValueError('Invalid level: %s' % config.config['logging_level'])
 
     fmt = config.config['logging_format']
@@ -286,15 +383,12 @@ def initialize_logging(logger=None) -> logging.Logger:
     if config.config['logging_syslog']:
         if sys.platform not in ('win32', 'cygwin'):
             handler = SysLogHandler()
-#            for k, v in encoded_priorities.items():
-#                handler.encodePriority(k, v)
             handler.setFormatter(
                 MillisecondAwareFormatter(
                     fmt=fmt,
                     datefmt=config.config['logging_date_format'],
                 )
             )
-            handler.setLevel(numeric_level)
             handlers.append(handler)
 
     if config.config['logging_filename']:
@@ -303,7 +397,6 @@ def initialize_logging(logger=None) -> logging.Logger:
             maxBytes = config.config['logging_filename_maxsize'],
             backupCount = config.config['logging_filename_count'],
         )
-        handler.setLevel(numeric_level)
         handler.setFormatter(
             MillisecondAwareFormatter(
                 fmt=fmt,
@@ -314,7 +407,6 @@ def initialize_logging(logger=None) -> logging.Logger:
 
     if config.config['logging_console']:
         handler = logging.StreamHandler(sys.stderr)
-        handler.setLevel(numeric_level)
         handler.setFormatter(
             MillisecondAwareFormatter(
                 fmt=fmt,
@@ -342,7 +434,14 @@ def initialize_logging(logger=None) -> logging.Logger:
         for handler in handlers:
             handler.addFilter(ProbabilisticFilter())
 
-    logger.setLevel(numeric_level)
+    for handler in handlers:
+        handler.addFilter(
+            DynamicPerScopeLoggingLevelFilter(
+                default_logging_level,
+                config.config['lmodule'],
+            )
+        )
+    logger.setLevel(0)
     logger.propagate = False
 
     if config.config['logging_captures_prints']:
index fa0bc0e7bdbbb7789ed56420f17d64b66375cd92..e0e3f6c10732b9a3ab20a251a225a0e963c362e9 100644 (file)
@@ -6,7 +6,7 @@ from typing import List
 from heapq import heappush, heappop
 
 
-class RunningMedian:
+class RunningMedian(object):
     """A running median computer.
 
     >>> median = RunningMedian()
index e5bc64f0ba7b9666947200cdfa0f578d289d8b33..b310183b4da6c1fdf002bee0c559abd9a9fd0ca1 100755 (executable)
@@ -7,6 +7,7 @@ import logging
 import re
 from typing import Dict, List
 
+# Note: this module is fairly early loaded.  Be aware of dependencies.
 import argparse_utils
 import bootstrap
 import config
index 332731277dde2ef16b983da04e749ba19975c496..e3b186daa2839c79063ca399997db1f9508fd021 100644 (file)
@@ -3,8 +3,9 @@
 from dataclasses import dataclass
 import logging
 import platform
-from typing import Callable, Optional
+from typing import Callable
 
+# Note: this module is fairly early loaded.  Be aware of dependencies.
 import config
 import presence
 
@@ -31,6 +32,7 @@ class SiteConfig(object):
     network_router_ip: str
     presence_location: presence.Location
     is_anyone_present: Callable[None, bool]
+    arper_minimum_device_count: int
 
 
 def get_location():
@@ -78,6 +80,7 @@ def get_config():
             network_router_ip = '10.0.0.1',
             presence_location = presence.Location.HOUSE,
             is_anyone_present = lambda x=presence.Location.HOUSE: is_anyone_present_wrapper(x),
+            arper_minimum_device_count = 50,
         )
     elif location == 'CABIN':
         return SiteConfig(
@@ -87,6 +90,7 @@ def get_config():
             network_router_ip = '192.168.0.1',
             presence_location = presence.Location.CABIN,
             is_anyone_present = lambda x=presence.Location.CABIN: is_anyone_present_wrapper(x),
+            arper_minimum_device_count = 15,
         )
     else:
         raise Exception(f'Unknown site location: {location}')
index 7dbec5004b4ba927331e71fb812fd482af678c3c..f11be17bc0fabfeb4a111e4b3356bbcb1ed1633a 100644 (file)
@@ -6,6 +6,8 @@ import concurrent.futures as fut
 import time
 from typing import Callable, List, TypeVar
 
+from overrides import overrides
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 from deferred_operand import DeferredOperand
@@ -66,5 +68,6 @@ class SmartFuture(DeferredOperand):
 
     # You shouldn't have to call this; instead, have a look at defining a
     # method on DeferredOperand base class.
+    @overrides
     def _resolve(self, *, timeout=None) -> T:
         return self.wrapped_future.result(timeout)
diff --git a/smart_home/chromecasts.py b/smart_home/chromecasts.py
new file mode 100644 (file)
index 0000000..08290e5
--- /dev/null
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+
+"""Utilities for dealing with the webcams."""
+
+import logging
+import time
+
+import pychromecast
+
+from decorator_utils import memoized
+import smart_home.device as dev
+
+logger = logging.getLogger(__name__)
+
+
+class BaseChromecast(dev.Device):
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        super().__init__(name.strip(), mac.strip(), keywords)
+        ip = self.get_ip()
+        self.cast = pychromecast.Chromecast(ip)
+        self.cast.wait()
+        time.sleep(0.1)
+
+    def is_idle(self):
+        return self.cast.is_idle
+
+    @memoized
+    def get_uuid(self):
+        return self.cast.uuid
+
+    @memoized
+    def get_friendly_name(self):
+        return self.cast.name
+
+    def get_uri(self):
+        return self.cast.url
+
+    @memoized
+    def get_model_name(self):
+        return self.cast.model_name
+
+    @memoized
+    def get_cast_type(self):
+        return self.cast.cast_type
+
+    @memoized
+    def app_id(self):
+        return self.cast.app_id
+
+    def get_app_display_name(self):
+        return self.cast.app_display_name
+
+    def get_media_controller(self):
+        return self.cast.media_controller
+
+    def status(self):
+        if self.is_idle():
+            return 'idle'
+        app = self.get_app_display_name()
+        mc = self.get_media_controller()
+        status = mc.status
+        return f'{app} / {status.title}'
+
+    def start_app(self, app_id, force_launch=False):
+        """Start an app on the Chromecast."""
+        self.cast.start_app(app_id, force_launch)
+
+    def quit_app(self):
+        """Tells the Chromecast to quit current app_id."""
+        self.cast.quit_app()
+
+    def volume_up(self, delta=0.1):
+        """Increment volume by 0.1 (or delta) unless it is already maxed.
+        Returns the new volume.
+        """
+        return self.cast.volume_up(delta)
+
+    def volume_down(self, delta=0.1):
+        """Decrement the volume by 0.1 (or delta) unless it is already 0.
+        Returns the new volume.
+        """
+        return self.cast.volume_down(delta)
+
+    def __repr__(self):
+        return (
+            f"Chromecast({self.cast.socket_client.host!r}, port={self.cast.socket_client.port!r}, "
+            f"device={self.cast.device!r})"
+        )
index 04b0bfee8abee690ab857692b15e928abf3bae03..9675b7c66ed26527d6c389854a8cf488a012f530 100644 (file)
@@ -28,9 +28,20 @@ class Device(object):
     def get_mac(self) -> str:
         return self.mac
 
-    def get_ip(self) -> str:
+    def get_ip(self) -> Optional[str]:
         return self.arper.get_ip_by_mac(self.mac)
 
+    def has_static_ip(self) -> bool:
+        for kw in self.kws:
+            m = re.search(r'static:([\d\.]+)', kw)
+            if m is not None:
+                ip = m.group(1)
+                assert self.get_ip() == ip
+                return True
+        return False
+
+    # Add command -> URL logic here.
+
     def get_keywords(self) -> Optional[List[str]]:
         return self.kws
 
index 54467223b9b1cc36c5edabe9d81df187cc1a409c..76b1500d5490693c939a025a49ecf886f2e38dab 100644 (file)
@@ -10,11 +10,14 @@ import os
 import re
 import subprocess
 import sys
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
 
+from overrides import overrides
 import tinytuya as tt
 
+import ansi
 import argparse_utils
+import arper
 import config
 import logging_utils
 import smart_home.device as dev
@@ -23,11 +26,11 @@ from decorator_utils import timeout, memoized
 
 logger = logging.getLogger(__name__)
 
-parser = config.add_commandline_args(
+args = config.add_commandline_args(
     f"Smart Lights ({__file__})",
     "Args related to smart lights.",
 )
-parser.add_argument(
+args.add_argument(
     '--smart_lights_tplink_location',
     default='/home/scott/bin/tplink.py',
     metavar='FILENAME',
@@ -60,6 +63,20 @@ class BaseLight(dev.Device):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name.strip(), mac.strip(), keywords)
 
+    @staticmethod
+    def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+        m = re.match(
+            'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+            color
+        )
+        if m is not None and len(m.group) == 3:
+            red = int(m.group(0), 16)
+            green = int(m.group(1), 16)
+            blue = int(m.group(2), 16)
+            return (red, green, blue)
+        color = color.lower()
+        return ansi.COLOR_NAMES_TO_RGB.get(color, None)
+
     @abstractmethod
     def turn_on(self) -> bool:
         pass
@@ -101,25 +118,30 @@ class GoogleLight(BaseLight):
     def parse_google_response(response: GoogleResponse) -> bool:
         return response.success
 
+    @overrides
     def turn_on(self) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"turn {self.goog_name()} on")
         )
 
+    @overrides
     def turn_off(self) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"turn {self.goog_name()} off")
         )
 
+    @overrides
     def is_on(self) -> bool:
         r = ask_google(f"is {self.goog_name()} on?")
         if not r.success:
             return False
         return 'is on' in r.audio_transcription
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -136,6 +158,7 @@ class GoogleLight(BaseLight):
             return 0
         return None
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False
@@ -149,6 +172,7 @@ class GoogleLight(BaseLight):
             return True
         return False
 
+    @overrides
     def make_color(self, color: str) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"make {self.goog_name()} {color}")
@@ -174,50 +198,55 @@ class TuyaLight(BaseLight):
     }
 
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        from subprocess import Popen, PIPE
         super().__init__(name, mac, keywords)
         mac = mac.upper()
         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
         self.devid = TuyaLight.ids_by_mac[mac]
         self.key = TuyaLight.keys_by_mac[mac]
-        try:
-            pid = Popen(['maclookup', mac], stdout=PIPE)
-            ip = pid.communicate()[0]
-            ip = ip[:-1]
-        except Exception:
-            ip = '0.0.0.0'
+        self.arper = arper.Arper()
+        ip = self.get_ip()
         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
 
+    def get_status(self) -> Dict[str, Any]:
+        return self.bulb.status()
+
+    @overrides
     def turn_on(self) -> bool:
         self.bulb.turn_on()
         return True
 
+    @overrides
     def turn_off(self) -> bool:
         self.bulb.turn_off()
         return True
 
-    def get_status(self) -> Dict[str, Any]:
-        return self.bulb.status()
-
+    @overrides
     def is_on(self) -> bool:
         s = self.get_status()
         return s['dps']['1']
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         s = self.get_status()
         return s['dps']['3']
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         self.bulb.set_brightness(level)
         return True
 
+    @overrides
     def make_color(self, color: str) -> bool:
-        self.bulb.set_colour(255,0,0)
-        return True
+        rgb = BaseLight.parse_color_string(color)
+        if rgb is not None:
+            self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+            return True
+        return False
 
 
 class TPLinkLight(BaseLight):
@@ -260,18 +289,23 @@ class TPLinkLight(BaseLight):
         logger.debug(f'About to execute {cmd}')
         return tplink_light_command(cmd)
 
+    @overrides
     def turn_on(self, child: str = None) -> bool:
         return self.command("on", child)
 
+    @overrides
     def turn_off(self, child: str = None) -> bool:
         return self.command("off", child)
 
+    @overrides
     def is_on(self) -> bool:
         return self.get_on_duration_seconds() > 0
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def make_color(self, color: str) -> bool:
         raise NotImplementedError
 
@@ -308,13 +342,7 @@ class TPLinkLight(BaseLight):
                     return int(chi.get("on_time", "0"))
         return 0
 
-    def get_on_limit_seconds(self) -> Optional[int]:
-        for kw in self.kws:
-            m = re.search(r"timeout:(\d+)", kw)
-            if m is not None:
-                return int(m.group(1)) * 60
-        return None
-
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -323,6 +351,7 @@ class TPLinkLight(BaseLight):
             return None
         return int(self.info.get("brightness", "0"))
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False
similarity index 62%
rename from smart_home/config.py
rename to smart_home/registry.py
index a28caa7b110db25043ae63b6524850cdfc38125b..2d23981d00ad9aaafef04a45c0761bd4cdefd5af 100644 (file)
@@ -10,15 +10,16 @@ import file_utils
 import logical_search
 import smart_home.device as device
 import smart_home.cameras as cameras
+import smart_home.chromecasts as chromecasts
 import smart_home.lights as lights
 import smart_home.outlets as outlets
 
-parser = config.add_commandline_args(
-    f"Smart Home Config ({__file__})",
-    "Args related to the smart home config."
+args = config.add_commandline_args(
+    f"Smart Home Registry ({__file__})",
+    "Args related to the smart home configuration registry."
 )
-parser.add_argument(
-    '--smart_home_config_file_location',
+args.add_argument(
+    '--smart_home_registry_file_location',
     default='/home/scott/bin/network_mac_addresses.txt',
     metavar='FILENAME',
     help='The location of network_mac_addresses.txt',
@@ -29,10 +30,10 @@ parser.add_argument(
 logger = logging.getLogger(__file__)
 
 
-class SmartHomeConfig(object):
+class SmartHomeRegistry(object):
     def __init__(
             self,
-            config_file: Optional[str] = None,
+            registry_file: Optional[str] = None,
             filters: List[str] = ['smart'],
     ) -> None:
         self._macs_by_name = {}
@@ -42,13 +43,13 @@ class SmartHomeConfig(object):
         self._corpus = logical_search.Corpus()
 
         # Read the disk config file...
-        if config_file is None:
-            config_file = config.config[
-                'smart_home_config_file_location'
+        if registry_file is None:
+            registry_file = config.config[
+                'smart_home_registry_file_location'
             ]
-        assert file_utils.does_file_exist(config_file)
-        logger.debug(f'Reading {config_file}')
-        with open(config_file, "r") as f:
+        assert file_utils.does_file_exist(registry_file)
+        logger.debug(f'Reading {registry_file}')
+        with open(registry_file, "r") as f:
             contents = f.readlines()
 
         # Parse the contents...
@@ -58,7 +59,7 @@ class SmartHomeConfig(object):
             line = line.strip()
             if line == "":
                 continue
-            logger.debug(f'> {line}')
+            logger.debug(f'SH-CONFIG> {line}')
             (mac, name, keywords) = line.split(",")
             mac = mac.strip()
             name = name.strip()
@@ -139,38 +140,47 @@ class SmartHomeConfig(object):
             name = self._names_by_mac[mac]
             kws = self._keywords_by_mac[mac]
             logger.debug(f'Found {name} -> {mac} ({kws})')
-            if 'light' in kws.lower():
-                if 'tplink' in kws.lower():
-                    logger.debug('    ...a TPLinkLight')
-                    return lights.TPLinkLight(name, mac, kws)
-                elif 'tuya' in kws.lower():
-                    logger.debug('    ...a TuyaLight')
-                    return lights.TuyaLight(name, mac, kws)
-                elif 'goog' in kws.lower():
-                    logger.debug('    ...a GoogleLight')
-                    return lights.GoogleLight(name, mac, kws)
-                else:
-                    raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
-            elif 'outlet' in kws.lower():
-                if 'tplink' in kws.lower():
-                    if 'children' in kws.lower():
-                        logger.debug('    ...a TPLinkOutletWithChildren')
-                        return outlets.TPLinkOutletWithChildren(name, mac, kws)
+            try:
+                if 'light' in kws.lower():
+                    if 'tplink' in kws.lower():
+                        logger.debug('    ...a TPLinkLight')
+                        return lights.TPLinkLight(name, mac, kws)
+                    elif 'tuya' in kws.lower():
+                        logger.debug('    ...a TuyaLight')
+                        return lights.TuyaLight(name, mac, kws)
+                    elif 'goog' in kws.lower():
+                        logger.debug('    ...a GoogleLight')
+                        return lights.GoogleLight(name, mac, kws)
+                    else:
+                        raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
+                elif 'outlet' in kws.lower():
+                    if 'tplink' in kws.lower():
+                        if 'children' in kws.lower():
+                            logger.debug('    ...a TPLinkOutletWithChildren')
+                            return outlets.TPLinkOutletWithChildren(name, mac, kws)
+                        else:
+                            logger.debug('    ...a TPLinkOutlet')
+                            return outlets.TPLinkOutlet(name, mac, kws)
+                    elif 'goog' in kws.lower():
+                        logger.debug('    ...a GoogleOutlet')
+                        return outlets.GoogleOutlet(name, mac, kws)
                     else:
-                        logger.debug('    ...a TPLinkOutlet')
-                        return outlets.TPLinkOutlet(name, mac, kws)
-                elif 'goog' in kws.lower():
-                    logger.debug('    ...a GoogleOutlet')
-                    return outlets.GoogleOutlet(name, mac, kws)
+                        raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
+                elif 'camera' in kws.lower():
+                    logger.debug('    ...a BaseCamera')
+                    return cameras.BaseCamera(name, mac, kws)
+                elif 'ccast' in kws.lower():
+                    logger.debug('    ...a Chromecast')
+                    return chromecasts.BaseChromecast(name, mac, kws)
                 else:
-                    raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
-            elif 'camera' in kws.lower():
-                logger.debug('    ...a BaseCamera')
-                return cameras.BaseCamera(name, mac, kws)
-            else:
-                logger.debug('    ...an unknown device (should this be here?)')
+                    logger.debug('    ...an unknown device (should this be here?)')
+                    return device.Device(name, mac, kws)
+            except Exception as e:
+                logger.warning(
+                    f'Got exception {e} while trying to communicate with device {name}/{mac}.'
+                )
                 return device.Device(name, mac, kws)
-        logger.warning(f'{mac} is not known, returning None')
+        logger.warning(f'{mac} is not a known smart home device, returning None')
         return None
 
     def query(self, query: str) -> List[device.Device]:
index 49ff9b979e2db0e26adfd6dc7b6f0e4663e62289..36cfe2fd720d9e0cb479992495a70d66161ae132 100644 (file)
@@ -232,7 +232,7 @@ def wrap_string(text: str, n: int) -> str:
     return out
 
 
-class Indenter:
+class Indenter(object):
     """
     with Indenter(pad_count = 8) as i:
         i.print('test')