Groups of google lights (kinda hacky, needs a rework), import auditing.
authorScott Gasch <[email protected]>
Tue, 5 Oct 2021 14:48:09 +0000 (07:48 -0700)
committerScott Gasch <[email protected]>
Tue, 5 Oct 2021 14:48:09 +0000 (07:48 -0700)
bootstrap.py
cached/weather_data.py
light_utils.py
pip_install.sh

index bf5d91f9eb3a42cd2432d493b74aba436e85e640..045411ae257116db18b2bb56b61cfe95cd14f762 100644 (file)
@@ -1,8 +1,10 @@
 #!/usr/bin/env python3
 
 import functools
+import importlib
 import logging
 import os
+from inspect import stack
 import sys
 
 # This module is commonly used by others in here and should avoid
@@ -37,9 +39,21 @@ args.add_argument(
     metavar='SEED_INT',
     help='Override the global random seed with a particular number.'
 )
+args.add_argument(
+    '--dump_all_objects',
+    action=ActionNoYes,
+    default=False,
+    help='Should we dump the Python import tree before main?'
+)
+args.add_argument(
+    '--audit_import_events',
+    action=ActionNoYes,
+    default=False,
+    help='Should we audit all import events?',
+)
 
-original_hook = sys.excepthook
 
+original_hook = sys.excepthook
 
 def handle_uncaught_exception(exc_type, exc_value, exc_tb):
     """
@@ -73,6 +87,122 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb):
                 original_hook(exc_type, exc_value, exc_tb)
 
 
+class ImportInterceptor(importlib.abc.MetaPathFinder):
+    def __init__(self):
+        import collect.trie
+        self.module_by_filename_cache = {}
+        self.repopulate_modules_by_filename()
+        self.tree = collect.trie.Trie()
+        self.tree_node_by_module = {}
+
+    def repopulate_modules_by_filename(self):
+        self.module_by_filename_cache.clear()
+        for mod in sys.modules:
+            if hasattr(sys.modules[mod], '__file__'):
+                fname = getattr(sys.modules[mod], '__file__')
+            else:
+                fname = 'unknown'
+            self.module_by_filename_cache[fname] = mod
+
+    def should_ignore_filename(self, filename: str) -> bool:
+        return 'importlib' in filename or 'six.py' in filename
+
+    def find_spec(self, loaded_module, path=None, target=None):
+        s = stack()
+        for x in range(3, len(s)):
+            filename = s[x].filename
+            if self.should_ignore_filename(filename):
+                continue
+
+            loading_function = s[x].function
+            if filename in self.module_by_filename_cache:
+                loading_module = self.module_by_filename_cache[filename]
+            else:
+                self.repopulate_modules_by_filename()
+                loading_module = self.module_by_filename_cache.get(filename, 'unknown')
+
+            path = self.tree_node_by_module.get(loading_module, [])
+            path.extend([loaded_module])
+            self.tree.insert(path)
+            self.tree_node_by_module[loading_module] = path
+
+            msg = f'*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}'
+            logger.debug(msg)
+            print(msg)
+            return
+        msg = f'*** Import {loaded_module} from ?????'
+        logger.debug(msg)
+        print(msg)
+
+    def find_importer(self, module: str):
+        if module in self.tree_node_by_module:
+            node = self.tree_node_by_module[module]
+            return node
+        return []
+
+
+# TODO: test this with python 3.8+
+def audit_import_events(event, args):
+    print(event)
+    print(args)
+
+
+# Audit import events?  Note: this runs early in the lifetime of the
+# process (assuming that import bootstrap happens early); config has
+# (probably) not yet been loaded or parsed the commandline.  Also,
+# some things have probably already been imported while we weren't
+# watching so this information may be incomplete.
+#
+# Also note: move bootstrap up in the global import list to catch
+# more import events and have a more complete record.
+import_interceptor = None
+for arg in sys.argv:
+    if arg == '--audit_import_events':
+        if not hasattr(sys, 'frozen'):
+            if (
+                    sys.version_info[0] == 3
+                    and sys.version_info[1] < 8
+            ):
+                import_interceptor = ImportInterceptor()
+                sys.meta_path = [import_interceptor] + sys.meta_path
+            else:
+                sys.addaudithook(audit_import_events)
+
+
+def dump_all_objects() -> None:
+    global import_interceptor
+    messages = {}
+    all_modules = sys.modules
+    for obj in object.__subclasses__():
+        if not hasattr(obj, '__name__'):
+            continue
+        klass = obj.__name__
+        if not hasattr(obj, '__module__'):
+            continue
+        class_mod_name = obj.__module__
+        if class_mod_name in all_modules:
+            mod = all_modules[class_mod_name]
+            if not hasattr(mod, '__name__'):
+                mod_name = class_mod_name
+            else:
+                mod_name = mod.__name__
+            if hasattr(mod, '__file__'):
+                mod_file = mod.__file__
+            else:
+                mod_file = 'unknown'
+            if import_interceptor is not None:
+                import_path = import_interceptor.find_importer(mod_name)
+            else:
+                import_path = 'unknown'
+            msg = f'{class_mod_name}::{klass} ({mod_file})'
+            if import_path != 'unknown' and len(import_path) > 0:
+                msg += f' imported by {import_path}'
+            messages[f'{class_mod_name}::{klass}'] = msg
+    for x in sorted(messages.keys()):
+        logger.debug(messages[x])
+        print(messages[x])
+
+
 def initialize(entry_point):
     """
     Remember to initialize config, initialize logging, set/log a random
@@ -81,7 +211,6 @@ def initialize(entry_point):
     """
     @functools.wraps(entry_point)
     def initialize_wrapper(*args, **kwargs):
-
         # Hook top level unhandled exceptions, maybe invoke debugger.
         if sys.excepthook == sys.__excepthook__:
             sys.excepthook = handle_uncaught_exception
@@ -113,8 +242,8 @@ def initialize(entry_point):
 
         if config.config['show_random_seed']:
             msg = f'Global random seed is: {random_seed}'
-            print(msg)
             logger.debug(msg)
+            print(msg)
         random.seed(random_seed)
 
         # Do it, invoke the user's code.  Pay attention to how long it takes.
@@ -127,6 +256,14 @@ def initialize(entry_point):
             f'{entry_point.__name__} (program entry point) returned {ret}.'
         )
 
+        if config.config['dump_all_objects']:
+            dump_all_objects()
+
+        if config.config['audit_import_events']:
+            global import_interceptor
+            if import_interceptor is not None:
+                print(import_interceptor.tree)
+
         walltime = t()
         (utime, stime, cutime, cstime, elapsed_time) = os.times()
         logger.debug('\n'
index 94a01313375f5a6e6ecc5b485c873e9280c54db5..8d49736bae0cf264fc7447c150011f616ccec388 100644 (file)
@@ -41,7 +41,7 @@ class WeatherData:
     date: datetime.date              # The date
     high: float                      # The predicted high in F
     low: float                       # The predicted low in F
-    precipitation_inchs: float       # Number of inches of precipitation / day
+    precipitation_inches: float      # Number of inches of precipitation / day
     conditions: List[str]            # Conditions per ~3h window
     most_common_condition: str       # The most common condition
     icon: str                        # An icon to represent it
@@ -108,7 +108,7 @@ class CachedWeatherData(persistent.Persistent):
             date = dt,
             high = float(parsed_json["main"]["temp_max"]),
             low = float(parsed_json["main"]["temp_min"]),
-            precipitation_inchs = p / 25.4,
+            precipitation_inches = p / 25.4,
             conditions = [condition],
             most_common_condition = condition,
             icon = icon,
@@ -172,7 +172,7 @@ class CachedWeatherData(persistent.Persistent):
                 date = dt,
                 high = highs[dt],
                 low = lows[dt],
-                precipitation_inchs = precip[dt] / 25.4,
+                precipitation_inches = precip[dt] / 25.4,
                 conditions = conditions[dt],
                 most_common_condition = most_common_condition,
                 icon = icon
index 63379af9b83059dfec9b0200b6a554c6f4e9e3f8..bd73ee3dfa9fa3d352116e7dd88171284610f65c 100644 (file)
@@ -15,6 +15,7 @@ from typing import Dict, List, Optional, Set
 import argparse_utils
 import config
 import logging_utils
+import logical_search
 from google_assistant import ask_google, GoogleResponse
 from decorator_utils import timeout, memoized
 
@@ -286,6 +287,105 @@ class TPLinkLight(Light):
         return tplink_light_command(cmd)
 
 
+class GoogleLightGroup(GoogleLight):
+    def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+        if len(members) < 1:
+            raise Exception("There must be at least one light in the group.")
+        self.members = members
+        mac = GoogleLightGroup.make_up_mac(members)
+        super().__init__(name, mac, keywords)
+
+    @staticmethod
+    def make_up_mac(members: List[GoogleLight]):
+        mac = members[0].get_mac()
+        b = mac.split(':')
+        b[5] = int(b[5], 16) + 1
+        if b[5] > 255:
+            b[5] = 0
+        b[5] = str(b[5])
+        return ":".join(b)
+
+    def is_on(self) -> bool:
+        r = ask_google(f"are {self.goog_name()} on?")
+        if not r.success:
+            return False
+        return 'is on' in r.audio_transcription
+
+    def get_dimmer_level(self) -> Optional[int]:
+        if not self.has_keyword("dimmer"):
+            return False
+        r = ask_google(f'how bright are {self.goog_name()}?')
+        if not r.success:
+            return None
+
+        # four lights are set to 100% brightness
+        txt = r.audio_transcription
+        m = re.search(r"(\d+)% bright", txt)
+        if m is not None:
+            return int(m.group(1))
+        if "is off" in txt:
+            return 0
+        return None
+
+    def set_dimmer_level(self, level: int) -> bool:
+        if not self.has_keyword("dimmer"):
+            return False
+        if 0 <= level <= 100:
+            was_on = self.is_on()
+            r = ask_google(f"set {self.goog_name()} to {level} percent")
+            if not r.success:
+                return False
+            if not was_on:
+                self.turn_off()
+            return True
+        return False
+
+    def make_color(self, color: str) -> bool:
+        return GoogleLight.parse_google_response(
+            ask_google(f"make {self.goog_name()} {color}")
+        )
+
+
+def group_google_lights(lights: List[Light]) -> List[Light]:
+    bookcase_group = []
+    diningroom_group = []
+    for light in lights:
+        name = light.get_name()
+        if "bookcase_light_" in name:
+            bookcase_group.append(light)
+        elif "diningroom_light_" in name:
+            diningroom_group.append(light)
+
+    did_bookcase = False
+    did_diningroom = False
+    ret = []
+    for light in lights:
+        name = light.get_name()
+        if "bookcase_light_" in name:
+            if len(bookcase_group) == 4 and not did_bookcase:
+                ret.append(
+                    GoogleLightGroup(
+                        "bookcase_lights",
+                        bookcase_group,
+                        "perm wifi light smart goog dimmer"
+                    )
+                )
+                did_bookcase = True
+        elif "diningroom_light_" in name:
+            if len(diningroom_group) == 2 and not did_diningroom:
+                ret.append(
+                    GoogleLightGroup(
+                        "dining_room_lights",
+                        diningroom_group,
+                        "intermittent wifi light smart goog dimmer"
+                    )
+                )
+                did_diningroom = True
+        else:
+            ret.append(light)
+    return ret
+
+
 class LightingConfig(object):
     """Representation of the smart light device config."""
 
@@ -293,7 +393,6 @@ class LightingConfig(object):
             self,
             config_file: str = None,
     ) -> None:
-        import logical_search
         if config_file is None:
             config_file = config.config[
                 'light_utils_network_mac_addresses_location'
@@ -305,6 +404,9 @@ class LightingConfig(object):
         self.corpus = logical_search.Corpus()
         with open(config_file, "r") as f:
             contents = f.readlines()
+
+        diningroom_lights = []
+        bookcase_lights = []
         for line in contents:
             line = line.rstrip("\n")
             line = re.sub(r"#.*$", r"", line)
@@ -317,27 +419,68 @@ class LightingConfig(object):
             keywords = keywords.strip()
             if "perm" not in keywords:
                 continue
-            properties = [("name", name)]
-            tags = set()
-            for kw in keywords.split():
-                if ":" in kw:
-                    key, value = kw.split(":")
-                    properties.append((key, value))
-                else:
-                    tags.add(kw)
-            properties.append(("name", name))
             self.macs_by_name[name] = mac
             self._keywords_by_name[name] = keywords
             self.keywords_by_mac[mac] = keywords
             self.names_by_mac[mac] = name
-            self.corpus.add_doc(
-                logical_search.Document(
-                    docid=mac,
-                    tags=tags,
-                    properties=properties,
-                    reference=None,
-                )
+
+            if "bookcase_light_" in name:
+                bookcase_lights.append(mac)
+            elif "diningroom_light_" in name:
+                diningroom_lights.append(mac)
+            else:
+                self.index_light(name, keywords, mac)
+
+        name = 'bookcase_lights'
+        group = []
+        keywords = 'perm wifi light smart goog dimmer'
+        for b in bookcase_lights:
+            group.append(self.get_light_by_mac(b))
+        self.bookcase_group = GoogleLightGroup(
+            name,
+            group,
+            keywords,
+        )
+        mac = self.bookcase_group.get_mac()
+        self.macs_by_name[name] = mac
+        self._keywords_by_name[name] = keywords
+        self.keywords_by_mac[mac] = keywords
+        self.names_by_mac[mac] = name
+        self.index_light(name, keywords, mac)
+
+        name = 'dining_room_lights'
+        group = []
+        for b in diningroom_lights:
+            group.append(self.get_light_by_mac(b))
+        self.diningroom_group = GoogleLightGroup(
+            name,
+            group,
+            keywords,
+        )
+        mac = self.diningroom_group.get_mac()
+        self.macs_by_name[name] = mac
+        self._keywords_by_name[name] = keywords
+        self.keywords_by_mac[mac] = keywords
+        self.names_by_mac[mac] = name
+        self.index_light(name, keywords, mac)
+
+    def index_light(self, name: str, keywords: str, mac: str) -> None:
+        properties = [("name", name)]
+        tags = set()
+        for kw in keywords.split():
+            if ":" in kw:
+                key, value = kw.split(":")
+                properties.append((key, value))
+            else:
+                tags.add(kw)
+        self.corpus.add_doc(
+            logical_search.Document(
+                docid=mac,
+                tags=tags,
+                properties=properties,
+                reference=None,
             )
+        )
 
     def __repr__(self) -> str:
         s = "Known devices:\n"
@@ -375,13 +518,17 @@ class LightingConfig(object):
                 light = self.get_light_by_mac(mac)
                 if light is not None:
                     retval.append(light)
-        return retval
+        return group_google_lights(retval)
 
     def get_light_by_mac(self, mac: str) -> Optional[Light]:
         if mac in self.keywords_by_mac:
             name = self.names_by_mac[mac]
             kws = self.keywords_by_mac[mac]
-            if "tplink" in kws.lower():
+            if name == 'bookcase_lights':
+                return self.bookcase_group
+            elif name == 'dining_room_lights':
+                return self.diningroom_group
+            elif "tplink" in kws.lower():
                 return TPLinkLight(name, mac, kws)
             else:
                 return GoogleLight(name, mac, kws)
@@ -400,4 +547,4 @@ class LightingConfig(object):
                     light = self.get_light_by_mac(mac)
                     if light is not None:
                         retval.append(light)
-        return retval
+        return group_google_lights(retval)
index 9d40902b1736ce1af64c4a70fe547709024490c9..832ae9c2b12b533f7e82aa4a2503c40dcb335a11 100755 (executable)
@@ -3,8 +3,8 @@
 set -e
 
 python3 -m ensurepip --upgrade
-for x in pip wheel aiohttp antlr4-python3-runtime astral bitstring python-dateutil \
-             grpcio holidays cloudpickle dill numpy protobuf psutil pyserial pytype \
+for x in pip wheel aiohttp antlr4-python3-runtime astral bankroll bitstring python-dateutil \
+             grpcio holidays cloudpickle dill numpy pandas protobuf psutil pyserial pytype \
              pychromecast requests SpeechRecognition sklearn scikit-learn nltk; do
     echo "--- Installing ${x} ---"
     pip install -U ${x}