#!/usr/bin/env python3
import functools
+import importlib
import logging
import os
+from inspect import stack
import sys
# This module is commonly used by others in here and should avoid
metavar='SEED_INT',
help='Override the global random seed with a particular number.'
)
+args.add_argument(
+ '--dump_all_objects',
+ action=ActionNoYes,
+ default=False,
+ help='Should we dump the Python import tree before main?'
+)
+args.add_argument(
+ '--audit_import_events',
+ action=ActionNoYes,
+ default=False,
+ help='Should we audit all import events?',
+)
-original_hook = sys.excepthook
+original_hook = sys.excepthook
def handle_uncaught_exception(exc_type, exc_value, exc_tb):
"""
original_hook(exc_type, exc_value, exc_tb)
+class ImportInterceptor(importlib.abc.MetaPathFinder):
+ def __init__(self):
+ import collect.trie
+ self.module_by_filename_cache = {}
+ self.repopulate_modules_by_filename()
+ self.tree = collect.trie.Trie()
+ self.tree_node_by_module = {}
+
+ def repopulate_modules_by_filename(self):
+ self.module_by_filename_cache.clear()
+ for mod in sys.modules:
+ if hasattr(sys.modules[mod], '__file__'):
+ fname = getattr(sys.modules[mod], '__file__')
+ else:
+ fname = 'unknown'
+ self.module_by_filename_cache[fname] = mod
+
+ def should_ignore_filename(self, filename: str) -> bool:
+ return 'importlib' in filename or 'six.py' in filename
+
+ def find_spec(self, loaded_module, path=None, target=None):
+ s = stack()
+ for x in range(3, len(s)):
+ filename = s[x].filename
+ if self.should_ignore_filename(filename):
+ continue
+
+ loading_function = s[x].function
+ if filename in self.module_by_filename_cache:
+ loading_module = self.module_by_filename_cache[filename]
+ else:
+ self.repopulate_modules_by_filename()
+ loading_module = self.module_by_filename_cache.get(filename, 'unknown')
+
+ path = self.tree_node_by_module.get(loading_module, [])
+ path.extend([loaded_module])
+ self.tree.insert(path)
+ self.tree_node_by_module[loading_module] = path
+
+ msg = f'*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}'
+ logger.debug(msg)
+ print(msg)
+ return
+ msg = f'*** Import {loaded_module} from ?????'
+ logger.debug(msg)
+ print(msg)
+
+ def find_importer(self, module: str):
+ if module in self.tree_node_by_module:
+ node = self.tree_node_by_module[module]
+ return node
+ return []
+
+
+# TODO: test this with python 3.8+
+def audit_import_events(event, args):
+ print(event)
+ print(args)
+
+
+# Audit import events? Note: this runs early in the lifetime of the
+# process (assuming that import bootstrap happens early); config has
+# (probably) not yet been loaded or parsed the commandline. Also,
+# some things have probably already been imported while we weren't
+# watching so this information may be incomplete.
+#
+# Also note: move bootstrap up in the global import list to catch
+# more import events and have a more complete record.
+import_interceptor = None
+for arg in sys.argv:
+ if arg == '--audit_import_events':
+ if not hasattr(sys, 'frozen'):
+ if (
+ sys.version_info[0] == 3
+ and sys.version_info[1] < 8
+ ):
+ import_interceptor = ImportInterceptor()
+ sys.meta_path = [import_interceptor] + sys.meta_path
+ else:
+ sys.addaudithook(audit_import_events)
+
+
+def dump_all_objects() -> None:
+ global import_interceptor
+ messages = {}
+ all_modules = sys.modules
+ for obj in object.__subclasses__():
+ if not hasattr(obj, '__name__'):
+ continue
+ klass = obj.__name__
+ if not hasattr(obj, '__module__'):
+ continue
+ class_mod_name = obj.__module__
+ if class_mod_name in all_modules:
+ mod = all_modules[class_mod_name]
+ if not hasattr(mod, '__name__'):
+ mod_name = class_mod_name
+ else:
+ mod_name = mod.__name__
+ if hasattr(mod, '__file__'):
+ mod_file = mod.__file__
+ else:
+ mod_file = 'unknown'
+ if import_interceptor is not None:
+ import_path = import_interceptor.find_importer(mod_name)
+ else:
+ import_path = 'unknown'
+ msg = f'{class_mod_name}::{klass} ({mod_file})'
+ if import_path != 'unknown' and len(import_path) > 0:
+ msg += f' imported by {import_path}'
+ messages[f'{class_mod_name}::{klass}'] = msg
+ for x in sorted(messages.keys()):
+ logger.debug(messages[x])
+ print(messages[x])
+
+
def initialize(entry_point):
"""
Remember to initialize config, initialize logging, set/log a random
"""
@functools.wraps(entry_point)
def initialize_wrapper(*args, **kwargs):
-
# Hook top level unhandled exceptions, maybe invoke debugger.
if sys.excepthook == sys.__excepthook__:
sys.excepthook = handle_uncaught_exception
if config.config['show_random_seed']:
msg = f'Global random seed is: {random_seed}'
- print(msg)
logger.debug(msg)
+ print(msg)
random.seed(random_seed)
# Do it, invoke the user's code. Pay attention to how long it takes.
f'{entry_point.__name__} (program entry point) returned {ret}.'
)
+ if config.config['dump_all_objects']:
+ dump_all_objects()
+
+ if config.config['audit_import_events']:
+ global import_interceptor
+ if import_interceptor is not None:
+ print(import_interceptor.tree)
+
walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
logger.debug('\n'
date: datetime.date # The date
high: float # The predicted high in F
low: float # The predicted low in F
- precipitation_inchs: float # Number of inches of precipitation / day
+ precipitation_inches: float # Number of inches of precipitation / day
conditions: List[str] # Conditions per ~3h window
most_common_condition: str # The most common condition
icon: str # An icon to represent it
date = dt,
high = float(parsed_json["main"]["temp_max"]),
low = float(parsed_json["main"]["temp_min"]),
- precipitation_inchs = p / 25.4,
+ precipitation_inches = p / 25.4,
conditions = [condition],
most_common_condition = condition,
icon = icon,
date = dt,
high = highs[dt],
low = lows[dt],
- precipitation_inchs = precip[dt] / 25.4,
+ precipitation_inches = precip[dt] / 25.4,
conditions = conditions[dt],
most_common_condition = most_common_condition,
icon = icon
import argparse_utils
import config
import logging_utils
+import logical_search
from google_assistant import ask_google, GoogleResponse
from decorator_utils import timeout, memoized
return tplink_light_command(cmd)
+class GoogleLightGroup(GoogleLight):
+ def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+ if len(members) < 1:
+ raise Exception("There must be at least one light in the group.")
+ self.members = members
+ mac = GoogleLightGroup.make_up_mac(members)
+ super().__init__(name, mac, keywords)
+
+ @staticmethod
+ def make_up_mac(members: List[GoogleLight]):
+ mac = members[0].get_mac()
+ b = mac.split(':')
+ b[5] = int(b[5], 16) + 1
+ if b[5] > 255:
+ b[5] = 0
+ b[5] = str(b[5])
+ return ":".join(b)
+
+ def is_on(self) -> bool:
+ r = ask_google(f"are {self.goog_name()} on?")
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ r = ask_google(f'how bright are {self.goog_name()}?')
+ if not r.success:
+ return None
+
+ # four lights are set to 100% brightness
+ txt = r.audio_transcription
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
+ return None
+
+ def set_dimmer_level(self, level: int) -> bool:
+ if not self.has_keyword("dimmer"):
+ return False
+ if 0 <= level <= 100:
+ was_on = self.is_on()
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
+ if not r.success:
+ return False
+ if not was_on:
+ self.turn_off()
+ return True
+ return False
+
+ def make_color(self, color: str) -> bool:
+ return GoogleLight.parse_google_response(
+ ask_google(f"make {self.goog_name()} {color}")
+ )
+
+
+def group_google_lights(lights: List[Light]) -> List[Light]:
+ bookcase_group = []
+ diningroom_group = []
+ for light in lights:
+ name = light.get_name()
+ if "bookcase_light_" in name:
+ bookcase_group.append(light)
+ elif "diningroom_light_" in name:
+ diningroom_group.append(light)
+
+ did_bookcase = False
+ did_diningroom = False
+ ret = []
+ for light in lights:
+ name = light.get_name()
+ if "bookcase_light_" in name:
+ if len(bookcase_group) == 4 and not did_bookcase:
+ ret.append(
+ GoogleLightGroup(
+ "bookcase_lights",
+ bookcase_group,
+ "perm wifi light smart goog dimmer"
+ )
+ )
+ did_bookcase = True
+ elif "diningroom_light_" in name:
+ if len(diningroom_group) == 2 and not did_diningroom:
+ ret.append(
+ GoogleLightGroup(
+ "dining_room_lights",
+ diningroom_group,
+ "intermittent wifi light smart goog dimmer"
+ )
+ )
+ did_diningroom = True
+ else:
+ ret.append(light)
+ return ret
+
+
class LightingConfig(object):
"""Representation of the smart light device config."""
self,
config_file: str = None,
) -> None:
- import logical_search
if config_file is None:
config_file = config.config[
'light_utils_network_mac_addresses_location'
self.corpus = logical_search.Corpus()
with open(config_file, "r") as f:
contents = f.readlines()
+
+ diningroom_lights = []
+ bookcase_lights = []
for line in contents:
line = line.rstrip("\n")
line = re.sub(r"#.*$", r"", line)
keywords = keywords.strip()
if "perm" not in keywords:
continue
- properties = [("name", name)]
- tags = set()
- for kw in keywords.split():
- if ":" in kw:
- key, value = kw.split(":")
- properties.append((key, value))
- else:
- tags.add(kw)
- properties.append(("name", name))
self.macs_by_name[name] = mac
self._keywords_by_name[name] = keywords
self.keywords_by_mac[mac] = keywords
self.names_by_mac[mac] = name
- self.corpus.add_doc(
- logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
- )
+
+ if "bookcase_light_" in name:
+ bookcase_lights.append(mac)
+ elif "diningroom_light_" in name:
+ diningroom_lights.append(mac)
+ else:
+ self.index_light(name, keywords, mac)
+
+ name = 'bookcase_lights'
+ group = []
+ keywords = 'perm wifi light smart goog dimmer'
+ for b in bookcase_lights:
+ group.append(self.get_light_by_mac(b))
+ self.bookcase_group = GoogleLightGroup(
+ name,
+ group,
+ keywords,
+ )
+ mac = self.bookcase_group.get_mac()
+ self.macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self.keywords_by_mac[mac] = keywords
+ self.names_by_mac[mac] = name
+ self.index_light(name, keywords, mac)
+
+ name = 'dining_room_lights'
+ group = []
+ for b in diningroom_lights:
+ group.append(self.get_light_by_mac(b))
+ self.diningroom_group = GoogleLightGroup(
+ name,
+ group,
+ keywords,
+ )
+ mac = self.diningroom_group.get_mac()
+ self.macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self.keywords_by_mac[mac] = keywords
+ self.names_by_mac[mac] = name
+ self.index_light(name, keywords, mac)
+
+ def index_light(self, name: str, keywords: str, mac: str) -> None:
+ properties = [("name", name)]
+ tags = set()
+ for kw in keywords.split():
+ if ":" in kw:
+ key, value = kw.split(":")
+ properties.append((key, value))
+ else:
+ tags.add(kw)
+ self.corpus.add_doc(
+ logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
)
+ )
def __repr__(self) -> str:
s = "Known devices:\n"
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return retval
+ return group_google_lights(retval)
def get_light_by_mac(self, mac: str) -> Optional[Light]:
if mac in self.keywords_by_mac:
name = self.names_by_mac[mac]
kws = self.keywords_by_mac[mac]
- if "tplink" in kws.lower():
+ if name == 'bookcase_lights':
+ return self.bookcase_group
+ elif name == 'dining_room_lights':
+ return self.diningroom_group
+ elif "tplink" in kws.lower():
return TPLinkLight(name, mac, kws)
else:
return GoogleLight(name, mac, kws)
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return retval
+ return group_google_lights(retval)
set -e
python3 -m ensurepip --upgrade
-for x in pip wheel aiohttp antlr4-python3-runtime astral bitstring python-dateutil \
- grpcio holidays cloudpickle dill numpy protobuf psutil pyserial pytype \
+for x in pip wheel aiohttp antlr4-python3-runtime astral bankroll bitstring python-dateutil \
+ grpcio holidays cloudpickle dill numpy pandas protobuf psutil pyserial pytype \
pychromecast requests SpeechRecognition sklearn scikit-learn nltk; do
echo "--- Installing ${x} ---"
pip install -U ${x}