import re
from typing import Any, Callable, List, Optional, Set, Sequence
+from overrides import overrides
+
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
self.allow_set = allow_set
self.deny_set = deny_set
+ @overrides
def check_allowed(self, x: Any) -> bool:
if self.allow_set is None:
return False
return x in self.allow_set
+ @overrides
def check_denied(self, x: Any) -> bool:
if self.deny_set is None:
return False
self.allow_predicate_list = allow_predicate_list
self.deny_predicate_list = deny_predicate_list
+ @overrides
def check_allowed(self, x: Any) -> bool:
if self.allow_predicate_list is None:
return False
return any(predicate(x) for predicate in self.allow_predicate_list)
+ @overrides
def check_denied(self, x: Any) -> bool:
if self.deny_predicate_list is None:
return False
)
self.subacls = subacls
+ @overrides
def check_allowed(self, x: Any) -> bool:
if self.subacls is None:
return False
return any(acl(x) for acl in self.subacls)
+ @overrides
def check_denied(self, x: Any) -> bool:
if self.subacls is None:
return False
)
self.subacls = subacls
+ @overrides
def check_allowed(self, x: Any) -> bool:
if self.subacls is None:
return False
return all(acl(x) for acl in self.subacls)
+ @overrides
def check_denied(self, x: Any) -> bool:
if self.subacls is None:
return False
import sys
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
+from overrides import overrides
+
import logging_utils
logger = logging.getLogger(__name__)
self.buf = ''
@abstractmethod
- def write(self, s):
+ def write(self, s: str):
pass
def __enter__(self) -> None:
super().__init__()
self.patterns = [_ for _ in patterns]
+ @overrides
def write(self, s: str):
for pattern in self.patterns:
s = pattern[0].sub(pattern[1], s)
import os
from typing import Any
+from overrides import overrides
+
+
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
help=help
)
+ @overrides
def __call__(self, parser, namespace, values, option_strings=None):
if (
option_strings.startswith('--no-') or
#!/usr/bin/env python3
+"""A caching layer around the kernel's network mapping between IPs and MACs"""
+
import datetime
import logging
import os
from typing import Any, Optional
+from overrides import overrides
+
import argparse_utils
from collect.bidict import BiDict
import config
import exec_utils
+import file_utils
import persistent
import string_utils
+import site_config
logger = logging.getLogger(__name__)
metavar='DURATION',
help='Max acceptable age of the kernel arp table cache'
)
+cfg.add_argument(
+ '--arper_min_entries_to_be_valid',
+ type=int,
+ default=site_config.get_config().arper_minimum_device_count,
+ help='Min number of arp entries to bother persisting.'
+)
@persistent.persistent_autoloaded_singleton()
self.state = cached_state
else:
logger.debug('No usable cached state; calling /usr/sbin/arp')
- output = exec_utils.cmd(
- '/usr/sbin/arp -a',
- timeout_seconds=5.0
- )
- for line in output.split('\n'):
- ip = string_utils.extract_ip_v4(line)
- mac = string_utils.extract_mac_address(line)
- if ip is not None and mac is not None:
- mac = mac.lower()
- logger.debug(f' {mac} => {ip}')
- self.state[mac] = ip
+ self.update()
+
+ def update(self):
+ output = exec_utils.cmd(
+ '/usr/sbin/arp -a',
+ timeout_seconds=5.0
+ )
+ for line in output.split('\n'):
+ ip = string_utils.extract_ip_v4(line)
+ mac = string_utils.extract_mac_address(line)
+ if ip is not None and mac is not None:
+ mac = mac.lower()
+ logger.debug(f' {mac} => {ip}')
+ self.state[mac] = ip
def get_ip_by_mac(self, mac: str) -> Optional[str]:
mac = mac.lower()
def get_mac_by_ip(self, ip: str) -> Optional[str]:
return self.state.inverse.get(ip, None)
+ @overrides
def save(self) -> bool:
- logger.debug(
- f'Persisting state to {config.config["arper_cache_location"]}'
- )
- with open(config.config['arper_cache_location'], 'w') as wf:
- for (mac, ip) in self.state.items():
- mac = mac.lower()
- print(f'{mac}, {ip}', file=wf)
+ if len(self.state) > config.config['arper_min_entries_to_be_valid']:
+ logger.debug(
+ f'Persisting state to {config.config["arper_cache_location"]}'
+ )
+ with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
+ for (mac, ip) in self.state.items():
+ mac = mac.lower()
+ print(f'{mac}, {ip}', file=wf)
+ return True
+ else:
+ logger.warning(
+ f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.'
+ )
+ return False
@classmethod
+ @overrides
def load(cls) -> Any:
+ cache_file = config.config['arper_cache_location']
if persistent.was_file_written_within_n_seconds(
- config.config['arper_cache_location'],
+ cache_file,
config.config['arper_cache_max_staleness'].total_seconds(),
):
- logger.debug(
- f'Loading state from {config.config["arper_cache_location"]}'
- )
+ logger.debug(f'Loading state from {cache_file}')
cached_state = BiDict()
- with open(config.config['arper_cache_location'], 'r') as rf:
+ with open(cache_file, 'r') as rf:
contents = rf.readlines()
for line in contents:
- logger.debug(f'ARPER> {line}')
+ line = line[:-1]
+ logger.debug(f'ARPER:{cache_file}> {line}')
(mac, ip) = line.split(',')
mac = mac.strip()
mac = mac.lower()
ip = ip.strip()
cached_state[mac] = ip
- return cls(cached_state)
+ if len(cached_state) > config.config['arper_min_entries_to_be_valid']:
+ return cls(cached_state)
+ else:
+ logger.warning(
+ f'{cache_file} sucks, only {len(cached_state)} entries. Deleting it.'
+ )
+ os.remove(cache_file)
+
logger.debug('No usable saved state found')
return None
# than once.
config_parse_called = False
+
# A global configuration dictionary that will contain parsed arguments.
# It is also this variable that modules use to access parsed arguments.
# This is the data that is most interesting to our callers; it will hold
# the configuration result.
-config: Dict[str, Any] = {}
+config = {}
+# It would be really nice if this shit worked from interactive python
def add_commandline_args(title: str, description: str = ""):
from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+from overrides import overrides
from ansi import bg, fg, underline, reset
import argparse_utils
self.histogram.add_item(duration)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
*newargs,
**kwargs)
+ @overrides
def shutdown(self,
wait = True) -> None:
logger.debug(f'Shutting down threadpool executor {self.title}')
self.adjust_task_count(-1)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
)
return result
+ @overrides
def shutdown(self, wait=True) -> None:
logger.debug(f'Shutting down processpool executor {self.title}')
self._process_executor.shutdown(wait)
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
+ @overrides
def submit(self,
function: Callable,
*args,
self.total_bundles_submitted += 1
return self._helper_executor.submit(self.launch, bundle)
+ @overrides
def shutdown(self, wait=True) -> None:
self._helper_executor.shutdown(wait)
logging.debug(f'Shutting down RemoteExecutor {self.title}')
logger = logging.getLogger(__name__)
+# os.remove(file) you fuckwit.
+
+
def create_path_if_not_exist(path, on_error=None):
"""
Attempts to create path if it does not exist. If on_error is
import bitstring
-from collect.bidict import bidict
+from collect.bidict import BiDict
-special_characters = bidict(
+special_characters = BiDict(
{
' ': 27,
'.': 28,
import sys
from typing import Callable, Iterable, Mapping, Optional
+from overrides import overrides
+
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
import argparse_utils
default='INFO',
choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
metavar='LEVEL',
- help='The level below which to squelch log messages.',
+ help='The global default level below which to squelch log messages; see also --lmodule',
)
cfg.add_argument(
'--logging_format',
default=False,
help='When calling print, also log.info automatically.'
)
+cfg.add_argument(
+ '--lmodule',
+ type=str,
+ metavar='<SCOPE>=<LEVEL>[,<SCOPE>=<LEVEL>...]',
+ help=(
+ 'Allows per-scope logging levels which override the global level set with --logging-level.' +
+ 'Pass a space separated list of <scope>=<level> where <scope> is one of: module, ' +
+ 'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
+ )
+)
+
built_in_print = print
self.counters = collections.Counter()
super().__init__()
+ @overrides
def filter(self, record: logging.LogRecord) -> bool:
id1 = f'{record.module}:{record.funcName}'
if id1 not in squelched_logging_counts:
return count < threshold
+class DynamicPerScopeLoggingLevelFilter(logging.Filter):
+ """Only interested in seeing logging messages from an allow list of
+ module names or module:function names. Block others.
+
+ """
+ @staticmethod
+ def level_name_to_level(name: str) -> int:
+ numeric_level = getattr(
+ logging,
+ name,
+ None
+ )
+ if not isinstance(numeric_level, int):
+ raise ValueError('Invalid level: {name}')
+ return numeric_level
+
+ def __init__(
+ self,
+ default_logging_level: int,
+ per_scope_logging_levels: str,
+ ) -> None:
+ super().__init__()
+ self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
+ self.default_logging_level = default_logging_level
+ self.level_by_scope = {}
+ if per_scope_logging_levels is not None:
+ for chunk in per_scope_logging_levels.split(','):
+ if '=' not in chunk:
+ print(
+ f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.',
+ file=sys.stderr
+ )
+ continue
+ try:
+ (scope, level) = chunk.split('=')
+ except ValueError:
+ print(
+ f'Malformed lmodule directive: "{chunk}". Ignored.',
+ file=sys.stderr
+ )
+ continue
+ scope = scope.strip()
+ level = level.strip().upper()
+ if level not in self.valid_levels:
+ print(
+ f'Malformed lmodule directive: "{chunk}", bad level. Ignored.',
+ file=sys.stderr
+ )
+ continue
+ self.level_by_scope[scope] = (
+ DynamicPerScopeLoggingLevelFilter.level_name_to_level(
+ level
+ )
+ )
+
+ @overrides
+ def filter(self, record: logging.LogRecord) -> bool:
+ # First try to find a logging level by scope (--lmodule)
+ if len(self.level_by_scope) > 0:
+ min_level = None
+ for scope in (
+ record.module,
+ f'{record.module}:{record.funcName}',
+ f':{record.funcName}'
+ ):
+ level = self.level_by_scope.get(scope, None)
+ if level is not None:
+ if min_level is None or level < min_level:
+ min_level = level
+
+ # If we found one, use it instead of the global default level.
+ if min_level is not None:
+ return record.levelno >= min_level
+
+ # Otherwise, use the global logging level (--logging_level)
+ return record.levelno >= self.default_logging_level
+
+
# A map from function_identifier -> probability of logging (0.0%..100.0%)
probabilistic_logging_levels: Mapping[str, float] = {}
been tagged with the @logging_utils.probabilistic_logging decorator.
"""
+ @overrides
def filter(self, record: logging.LogRecord) -> bool:
id1 = f'{record.module}:{record.funcName}'
if id1 not in probabilistic_logging_levels:
stdout handler.
"""
- def filter(self, record):
+ @overrides
+ def filter(self, record: logging.LogRecord):
return record.levelno == logging.INFO
"""
converter = datetime.datetime.fromtimestamp
+ @overrides
def formatTime(self, record, datefmt=None):
ct = MillisecondAwareFormatter.converter(
record.created, pytz.timezone("US/Pacific")
return logger
handlers = []
- numeric_level = getattr(
+
+ # Global default logging level (--logging_level)
+ default_logging_level = getattr(
logging,
config.config['logging_level'].upper(),
None
)
- if not isinstance(numeric_level, int):
+ if not isinstance(default_logging_level, int):
raise ValueError('Invalid level: %s' % config.config['logging_level'])
fmt = config.config['logging_format']
if config.config['logging_syslog']:
if sys.platform not in ('win32', 'cygwin'):
handler = SysLogHandler()
-# for k, v in encoded_priorities.items():
-# handler.encodePriority(k, v)
handler.setFormatter(
MillisecondAwareFormatter(
fmt=fmt,
datefmt=config.config['logging_date_format'],
)
)
- handler.setLevel(numeric_level)
handlers.append(handler)
if config.config['logging_filename']:
maxBytes = config.config['logging_filename_maxsize'],
backupCount = config.config['logging_filename_count'],
)
- handler.setLevel(numeric_level)
handler.setFormatter(
MillisecondAwareFormatter(
fmt=fmt,
if config.config['logging_console']:
handler = logging.StreamHandler(sys.stderr)
- handler.setLevel(numeric_level)
handler.setFormatter(
MillisecondAwareFormatter(
fmt=fmt,
for handler in handlers:
handler.addFilter(ProbabilisticFilter())
- logger.setLevel(numeric_level)
+ for handler in handlers:
+ handler.addFilter(
+ DynamicPerScopeLoggingLevelFilter(
+ default_logging_level,
+ config.config['lmodule'],
+ )
+ )
+ logger.setLevel(0)
logger.propagate = False
if config.config['logging_captures_prints']:
from heapq import heappush, heappop
-class RunningMedian:
+class RunningMedian(object):
"""A running median computer.
>>> median = RunningMedian()
import re
from typing import Dict, List
+# Note: this module is fairly early loaded. Be aware of dependencies.
import argparse_utils
import bootstrap
import config
from dataclasses import dataclass
import logging
import platform
-from typing import Callable, Optional
+from typing import Callable
+# Note: this module is fairly early loaded. Be aware of dependencies.
import config
import presence
network_router_ip: str
presence_location: presence.Location
is_anyone_present: Callable[None, bool]
+ arper_minimum_device_count: int
def get_location():
network_router_ip = '10.0.0.1',
presence_location = presence.Location.HOUSE,
is_anyone_present = lambda x=presence.Location.HOUSE: is_anyone_present_wrapper(x),
+ arper_minimum_device_count = 50,
)
elif location == 'CABIN':
return SiteConfig(
network_router_ip = '192.168.0.1',
presence_location = presence.Location.CABIN,
is_anyone_present = lambda x=presence.Location.CABIN: is_anyone_present_wrapper(x),
+ arper_minimum_device_count = 15,
)
else:
raise Exception(f'Unknown site location: {location}')
import time
from typing import Callable, List, TypeVar
+from overrides import overrides
+
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
from deferred_operand import DeferredOperand
# You shouldn't have to call this; instead, have a look at defining a
# method on DeferredOperand base class.
+ @overrides
def _resolve(self, *, timeout=None) -> T:
return self.wrapped_future.result(timeout)
--- /dev/null
+#!/usr/bin/env python3
+
+"""Utilities for dealing with the webcams."""
+
+import logging
+import time
+
+import pychromecast
+
+from decorator_utils import memoized
+import smart_home.device as dev
+
+logger = logging.getLogger(__name__)
+
+
+class BaseChromecast(dev.Device):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name.strip(), mac.strip(), keywords)
+ ip = self.get_ip()
+ self.cast = pychromecast.Chromecast(ip)
+ self.cast.wait()
+ time.sleep(0.1)
+
+ def is_idle(self):
+ return self.cast.is_idle
+
+ @memoized
+ def get_uuid(self):
+ return self.cast.uuid
+
+ @memoized
+ def get_friendly_name(self):
+ return self.cast.name
+
+ def get_uri(self):
+ return self.cast.url
+
+ @memoized
+ def get_model_name(self):
+ return self.cast.model_name
+
+ @memoized
+ def get_cast_type(self):
+ return self.cast.cast_type
+
+ @memoized
+ def app_id(self):
+ return self.cast.app_id
+
+ def get_app_display_name(self):
+ return self.cast.app_display_name
+
+ def get_media_controller(self):
+ return self.cast.media_controller
+
+ def status(self):
+ if self.is_idle():
+ return 'idle'
+ app = self.get_app_display_name()
+ mc = self.get_media_controller()
+ status = mc.status
+ return f'{app} / {status.title}'
+
+ def start_app(self, app_id, force_launch=False):
+ """Start an app on the Chromecast."""
+ self.cast.start_app(app_id, force_launch)
+
+ def quit_app(self):
+ """Tells the Chromecast to quit current app_id."""
+ self.cast.quit_app()
+
+ def volume_up(self, delta=0.1):
+ """Increment volume by 0.1 (or delta) unless it is already maxed.
+ Returns the new volume.
+ """
+ return self.cast.volume_up(delta)
+
+ def volume_down(self, delta=0.1):
+ """Decrement the volume by 0.1 (or delta) unless it is already 0.
+ Returns the new volume.
+ """
+ return self.cast.volume_down(delta)
+
+ def __repr__(self):
+ return (
+ f"Chromecast({self.cast.socket_client.host!r}, port={self.cast.socket_client.port!r}, "
+ f"device={self.cast.device!r})"
+ )
def get_mac(self) -> str:
return self.mac
- def get_ip(self) -> str:
+ def get_ip(self) -> Optional[str]:
return self.arper.get_ip_by_mac(self.mac)
+ def has_static_ip(self) -> bool:
+ for kw in self.kws:
+ m = re.search(r'static:([\d\.]+)', kw)
+ if m is not None:
+ ip = m.group(1)
+ assert self.get_ip() == ip
+ return True
+ return False
+
+ # Add command -> URL logic here.
+
def get_keywords(self) -> Optional[List[str]]:
return self.kws
import re
import subprocess
import sys
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
+from overrides import overrides
import tinytuya as tt
+import ansi
import argparse_utils
+import arper
import config
import logging_utils
import smart_home.device as dev
logger = logging.getLogger(__name__)
-parser = config.add_commandline_args(
+args = config.add_commandline_args(
f"Smart Lights ({__file__})",
"Args related to smart lights.",
)
-parser.add_argument(
+args.add_argument(
'--smart_lights_tplink_location',
default='/home/scott/bin/tplink.py',
metavar='FILENAME',
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name.strip(), mac.strip(), keywords)
+ @staticmethod
+ def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+ m = re.match(
+ 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+ color
+ )
+ if m is not None and len(m.group) == 3:
+ red = int(m.group(0), 16)
+ green = int(m.group(1), 16)
+ blue = int(m.group(2), 16)
+ return (red, green, blue)
+ color = color.lower()
+ return ansi.COLOR_NAMES_TO_RGB.get(color, None)
+
@abstractmethod
def turn_on(self) -> bool:
pass
def parse_google_response(response: GoogleResponse) -> bool:
return response.success
+ @overrides
def turn_on(self) -> bool:
return GoogleLight.parse_google_response(
ask_google(f"turn {self.goog_name()} on")
)
+ @overrides
def turn_off(self) -> bool:
return GoogleLight.parse_google_response(
ask_google(f"turn {self.goog_name()} off")
)
+ @overrides
def is_on(self) -> bool:
r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
return 'is on' in r.audio_transcription
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
return 0
return None
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
return True
return False
+ @overrides
def make_color(self, color: str) -> bool:
return GoogleLight.parse_google_response(
ask_google(f"make {self.goog_name()} {color}")
}
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- from subprocess import Popen, PIPE
super().__init__(name, mac, keywords)
mac = mac.upper()
if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
self.devid = TuyaLight.ids_by_mac[mac]
self.key = TuyaLight.keys_by_mac[mac]
- try:
- pid = Popen(['maclookup', mac], stdout=PIPE)
- ip = pid.communicate()[0]
- ip = ip[:-1]
- except Exception:
- ip = '0.0.0.0'
+ self.arper = arper.Arper()
+ ip = self.get_ip()
self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+ def get_status(self) -> Dict[str, Any]:
+ return self.bulb.status()
+
+ @overrides
def turn_on(self) -> bool:
self.bulb.turn_on()
return True
+ @overrides
def turn_off(self) -> bool:
self.bulb.turn_off()
return True
- def get_status(self) -> Dict[str, Any]:
- return self.bulb.status()
-
+ @overrides
def is_on(self) -> bool:
s = self.get_status()
return s['dps']['1']
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
s = self.get_status()
return s['dps']['3']
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
self.bulb.set_brightness(level)
return True
+ @overrides
def make_color(self, color: str) -> bool:
- self.bulb.set_colour(255,0,0)
- return True
+ rgb = BaseLight.parse_color_string(color)
+ if rgb is not None:
+ self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+ return True
+ return False
class TPLinkLight(BaseLight):
logger.debug(f'About to execute {cmd}')
return tplink_light_command(cmd)
+ @overrides
def turn_on(self, child: str = None) -> bool:
return self.command("on", child)
+ @overrides
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
+ @overrides
def is_on(self) -> bool:
return self.get_on_duration_seconds() > 0
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def make_color(self, color: str) -> bool:
raise NotImplementedError
return int(chi.get("on_time", "0"))
return 0
- def get_on_limit_seconds(self) -> Optional[int]:
- for kw in self.kws:
- m = re.search(r"timeout:(\d+)", kw)
- if m is not None:
- return int(m.group(1)) * 60
- return None
-
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
return None
return int(self.info.get("brightness", "0"))
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
import logical_search
import smart_home.device as device
import smart_home.cameras as cameras
+import smart_home.chromecasts as chromecasts
import smart_home.lights as lights
import smart_home.outlets as outlets
-parser = config.add_commandline_args(
- f"Smart Home Config ({__file__})",
- "Args related to the smart home config."
+args = config.add_commandline_args(
+ f"Smart Home Registry ({__file__})",
+ "Args related to the smart home configuration registry."
)
-parser.add_argument(
- '--smart_home_config_file_location',
+args.add_argument(
+ '--smart_home_registry_file_location',
default='/home/scott/bin/network_mac_addresses.txt',
metavar='FILENAME',
help='The location of network_mac_addresses.txt',
logger = logging.getLogger(__file__)
-class SmartHomeConfig(object):
+class SmartHomeRegistry(object):
def __init__(
self,
- config_file: Optional[str] = None,
+ registry_file: Optional[str] = None,
filters: List[str] = ['smart'],
) -> None:
self._macs_by_name = {}
self._corpus = logical_search.Corpus()
# Read the disk config file...
- if config_file is None:
- config_file = config.config[
- 'smart_home_config_file_location'
+ if registry_file is None:
+ registry_file = config.config[
+ 'smart_home_registry_file_location'
]
- assert file_utils.does_file_exist(config_file)
- logger.debug(f'Reading {config_file}')
- with open(config_file, "r") as f:
+ assert file_utils.does_file_exist(registry_file)
+ logger.debug(f'Reading {registry_file}')
+ with open(registry_file, "r") as f:
contents = f.readlines()
# Parse the contents...
line = line.strip()
if line == "":
continue
- logger.debug(f'> {line}')
+ logger.debug(f'SH-CONFIG> {line}')
(mac, name, keywords) = line.split(",")
mac = mac.strip()
name = name.strip()
name = self._names_by_mac[mac]
kws = self._keywords_by_mac[mac]
logger.debug(f'Found {name} -> {mac} ({kws})')
- if 'light' in kws.lower():
- if 'tplink' in kws.lower():
- logger.debug(' ...a TPLinkLight')
- return lights.TPLinkLight(name, mac, kws)
- elif 'tuya' in kws.lower():
- logger.debug(' ...a TuyaLight')
- return lights.TuyaLight(name, mac, kws)
- elif 'goog' in kws.lower():
- logger.debug(' ...a GoogleLight')
- return lights.GoogleLight(name, mac, kws)
- else:
- raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
- elif 'outlet' in kws.lower():
- if 'tplink' in kws.lower():
- if 'children' in kws.lower():
- logger.debug(' ...a TPLinkOutletWithChildren')
- return outlets.TPLinkOutletWithChildren(name, mac, kws)
+ try:
+ if 'light' in kws.lower():
+ if 'tplink' in kws.lower():
+ logger.debug(' ...a TPLinkLight')
+ return lights.TPLinkLight(name, mac, kws)
+ elif 'tuya' in kws.lower():
+ logger.debug(' ...a TuyaLight')
+ return lights.TuyaLight(name, mac, kws)
+ elif 'goog' in kws.lower():
+ logger.debug(' ...a GoogleLight')
+ return lights.GoogleLight(name, mac, kws)
+ else:
+ raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
+ elif 'outlet' in kws.lower():
+ if 'tplink' in kws.lower():
+ if 'children' in kws.lower():
+ logger.debug(' ...a TPLinkOutletWithChildren')
+ return outlets.TPLinkOutletWithChildren(name, mac, kws)
+ else:
+ logger.debug(' ...a TPLinkOutlet')
+ return outlets.TPLinkOutlet(name, mac, kws)
+ elif 'goog' in kws.lower():
+ logger.debug(' ...a GoogleOutlet')
+ return outlets.GoogleOutlet(name, mac, kws)
else:
- logger.debug(' ...a TPLinkOutlet')
- return outlets.TPLinkOutlet(name, mac, kws)
- elif 'goog' in kws.lower():
- logger.debug(' ...a GoogleOutlet')
- return outlets.GoogleOutlet(name, mac, kws)
+ raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
+ elif 'camera' in kws.lower():
+ logger.debug(' ...a BaseCamera')
+ return cameras.BaseCamera(name, mac, kws)
+ elif 'ccast' in kws.lower():
+ logger.debug(' ...a Chromecast')
+ return chromecasts.BaseChromecast(name, mac, kws)
else:
- raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
- elif 'camera' in kws.lower():
- logger.debug(' ...a BaseCamera')
- return cameras.BaseCamera(name, mac, kws)
- else:
- logger.debug(' ...an unknown device (should this be here?)')
+ logger.debug(' ...an unknown device (should this be here?)')
+ return device.Device(name, mac, kws)
+ except Exception as e:
+ logger.warning(
+ f'Got exception {e} while trying to communicate with device {name}/{mac}.'
+ )
return device.Device(name, mac, kws)
- logger.warning(f'{mac} is not known, returning None')
+ logger.warning(f'{mac} is not a known smart home device, returning None')
return None
def query(self, query: str) -> List[device.Device]:
return out
-class Indenter:
+class Indenter(object):
"""
with Indenter(pad_count = 8) as i:
i.print('test')