--- /dev/null
+#!/usr/bin/env python3
+
+import re
+from typing import List, Optional, Set
+
+import argparse_utils
+import config
+import file_utils
+import logical_search
+import smart_home.device as device
+import smart_home.lights as lights
+import smart_home.outlets as outlets
+
+parser = config.add_commandline_args(
+ f"Smart Home Config ({__file__})",
+ "Args related to the smart home config."
+)
+parser.add_argument(
+ '--smart_home_config_file_location',
+ default='/home/scott/bin/network_mac_addresses.txt',
+ metavar='FILENAME',
+ help='The location of network_mac_addresses.txt',
+ type=argparse_utils.valid_filename,
+)
+
+
+class SmartHomeConfig(object):
+ def __init__(
+ self,
+ config_file: Optional[str] = None,
+ filters: List[str] = ['smart'],
+ ) -> None:
+ if config_file is None:
+ config_file = config.config[
+ 'smart_home_config_file_location'
+ ]
+ assert file_utils.does_file_exist(config_file)
+ with open(config_file, "r") as f:
+ contents = f.readlines()
+
+ self._macs_by_name = {}
+ self._keywords_by_name = {}
+ self._keywords_by_mac = {}
+ self._names_by_mac = {}
+ self._corpus = logical_search.Corpus()
+
+ for line in contents:
+ line = line.rstrip("\n")
+ line = re.sub(r"#.*$", r"", line)
+ line = line.strip()
+ if line == "":
+ continue
+ (mac, name, keywords) = line.split(",")
+ mac = mac.strip()
+ name = name.strip()
+ keywords = keywords.strip()
+
+ if filters is not None:
+ for f in filters:
+ if not f in keywords:
+ continue
+
+ self._macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self._keywords_by_mac[mac] = keywords
+ self._names_by_mac[mac] = name
+ self.index_device(name, keywords, mac)
+
+ def index_device(self, name: str, keywords: str, mac: str) -> None:
+ properties = [("name", name)]
+ tags = set()
+ for kw in keywords.split():
+ if ":" in kw:
+ key, value = kw.split(":")
+ properties.append((key, value))
+ else:
+ tags.add(kw)
+ device = logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
+ )
+ self._corpus.add_doc(device)
+
+ def __repr__(self) -> str:
+ s = "Known devices:\n"
+ for name, keywords in self._keywords_by_name.items():
+ mac = self._macs_by_name[name]
+ s += f" {name} ({mac}) => {keywords}\n"
+ return s
+
+ def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
+ return self._keywords_by_name.get(name, None)
+
+ def get_macs_by_name(self, name: str) -> Set[str]:
+ retval = set()
+ for (mac, lname) in self._names_by_mac.items():
+ if name in lname:
+ retval.add(mac)
+ return retval
+
+ def get_macs_by_keyword(self, keyword: str) -> Set[str]:
+ retval = set()
+ for (mac, keywords) in self._keywords_by_mac.items():
+ if keyword in keywords:
+ retval.add(mac)
+ return retval
+
+ def get_device_by_name(self, name: str) -> Optional[device.Device]:
+ if name in self._macs_by_name:
+ return self.get_device_by_mac(self._macs_by_name[name])
+ return None
+
+ def get_all_devices(self) -> List[device.Device]:
+ retval = []
+ for (mac, kws) in self._keywords_by_mac.items():
+ if mac is not None:
+ device = self.get_device_by_mac(mac)
+ if device is not None:
+ retval.append(device)
+ return retval
+
+ def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
+ if mac in self._keywords_by_mac:
+ name = self._names_by_mac[mac]
+ kws = self._keywords_by_mac[mac]
+ if 'light' in kws.lower():
+ if 'tplink' in kws.lower():
+ return lights.TPLinkLight(name, mac, kws)
+ elif 'tuya' in kws.lower():
+ return lights.TuyaLight(name, mac, kws)
+ elif 'goog' in kws.lower():
+ return lights.GoogleLight(name, mac, kws)
+ else:
+ raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
+ elif 'outlet' in kws.lower():
+ if 'tplink' in kws.lower():
+ if 'children' in kws.lower():
+ return outlets.TPLinkOutletWithChildren(name, mac, kws)
+ else:
+ return outlets.TPLinkOutlet(name, mac, kws)
+ elif 'goog' in kws.lower():
+ return outlets.GoogleOutlet(name, mac, kws)
+ else:
+ raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
+ else:
+ return device.Device(name, mac, kws)
+ return None
+
+ def query(self, query: str) -> List[device.Device]:
+ """Evaluates a lighting query expression formed of keywords to search
+ for, logical operators (and, or, not), and parenthesis.
+ Returns a list of matching lights.
+ """
+ retval = []
+ results = self._corpus.query(query)
+ if results is not None:
+ for mac in results:
+ if mac is not None:
+ device = self.get_device_by_mac(mac)
+ if device is not None:
+ retval.append(device)
+ return retval
--- /dev/null
+#!/usr/bin/env python3
+
+import re
+from typing import Any, List, Optional, Tuple
+
+class Device(object):
+ def __init__(
+ self,
+ name: str,
+ mac: str,
+ keywords: Optional[List[str]],
+ ):
+ self.name = name
+ self.mac = mac
+ self.keywords = keywords
+ if keywords is not None:
+ self.kws = keywords.split()
+ else:
+ self.kws = []
+
+ def get_name(self) -> str:
+ return self.name
+
+ def get_mac(self) -> str:
+ return self.mac
+
+ def get_keywords(self) -> Optional[List[str]]:
+ return self.kws
+
+ def has_keyword(self, keyword: str) -> bool:
+ for kw in self.kws:
+ if kw == keyword:
+ return True
+ return False
+
+ def get_on_limit_seconds(self) -> Optional[int]:
+ for kw in self.kws:
+ m = re.search(r"timeout:(\d+)", kw)
+ if m is not None:
+ return int(m.group(1)) * 60
+ return None
import argparse_utils
import config
import logging_utils
-import logical_search
+import smart_home.device as dev
from google_assistant import ask_google, GoogleResponse
from decorator_utils import timeout, memoized
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
- f"Light Utils ({__file__})",
- "Args related to light utilities."
+ f"Smart Lights ({__file__})",
+ "Args related to smart lights.",
)
parser.add_argument(
- '--light_utils_tplink_location',
+ '--smart_lights_tplink_location',
default='/home/scott/bin/tplink.py',
metavar='FILENAME',
help='The location of the tplink.py helper',
type=argparse_utils.valid_filename,
)
-parser.add_argument(
- '--light_utils_network_mac_addresses_location',
- default='/home/scott/bin/network_mac_addresses.txt',
- metavar='FILENAME',
- help='The location of network_mac_addresses.txt',
- type=argparse_utils.valid_filename,
-)
@timeout(
return True
-class Light(ABC):
+class BaseLight(dev.Device):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- self.name = name.strip()
- self.mac = mac.strip()
- self.keywords = keywords.strip()
- self.kws = keywords.split()
-
- def get_name(self) -> str:
- return self.name
-
- def get_mac(self) -> str:
- return self.mac
+ super().__init__(name.strip(), mac.strip(), keywords)
@abstractmethod
def turn_on(self) -> bool:
def make_color(self, color: str) -> bool:
pass
- def get_keywords(self) -> List[str]:
- return self.kws
- def has_keyword(self, keyword: str) -> bool:
- for kw in self.kws:
- if kw == keyword:
- return True
- return False
-
-
-class GoogleLight(Light):
+class GoogleLight(BaseLight):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
)
-class TuyaLight(Light):
+class TuyaLight(BaseLight):
ids_by_mac = {
'68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
'68:C6:3A:DE:27:1A': '8844664268c63ade271a',
return True
-class TPLinkLight(Light):
+class TPLinkLight(BaseLight):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
self.children: List[str] = []
def get_cmdline(self, child: str = None) -> str:
cmd = (
- f"{config.config['light_utils_tplink_location']} -m {self.mac} "
+ f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
f"--no_logging_console "
)
if child is not None:
return tplink_light_command(cmd)
-class GoogleLightGroup(GoogleLight):
- def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
- if len(members) < 1:
- raise Exception("There must be at least one light in the group.")
- self.members = members
- mac = GoogleLightGroup.make_up_mac(members)
- super().__init__(name, mac, keywords)
-
- @staticmethod
- def make_up_mac(members: List[GoogleLight]):
- mac = members[0].get_mac()
- b = mac.split(':')
- b[5] = int(b[5], 16) + 1
- if b[5] > 255:
- b[5] = 0
- b[5] = str(b[5])
- return ":".join(b)
-
- def is_on(self) -> bool:
- r = ask_google(f"are {self.goog_name()} on?")
- if not r.success:
- return False
- return 'is on' in r.audio_transcription
-
- def get_dimmer_level(self) -> Optional[int]:
- if not self.has_keyword("dimmer"):
- return False
- r = ask_google(f'how bright are {self.goog_name()}?')
- if not r.success:
- return None
-
- # four lights are set to 100% brightness
- txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
- return None
-
- def set_dimmer_level(self, level: int) -> bool:
- if not self.has_keyword("dimmer"):
- return False
- if 0 <= level <= 100:
- was_on = self.is_on()
- r = ask_google(f"set {self.goog_name()} to {level} percent")
- if not r.success:
- return False
- if not was_on:
- self.turn_off()
- return True
- return False
-
- def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
-
-
-class LightingConfig(object):
- """Representation of the smart light device config."""
-
- def __init__(
- self,
- config_file: str = None,
- ) -> None:
- if config_file is None:
- config_file = config.config[
- 'light_utils_network_mac_addresses_location'
- ]
- self.macs_by_name = {}
- self._keywords_by_name = {}
- self.keywords_by_mac = {}
- self.names_by_mac = {}
- self.corpus = logical_search.Corpus()
- with open(config_file, "r") as f:
- contents = f.readlines()
-
- diningroom_lights = []
- bookcase_lights = []
- for line in contents:
- line = line.rstrip("\n")
- line = re.sub(r"#.*$", r"", line)
- line = line.strip()
- if line == "":
- continue
- (mac, name, keywords) = line.split(",")
- mac = mac.strip()
- name = name.strip()
- keywords = keywords.strip()
- if "perm" not in keywords:
- continue
- self.macs_by_name[name] = mac
- self._keywords_by_name[name] = keywords
- self.keywords_by_mac[mac] = keywords
- self.names_by_mac[mac] = name
-
-# if "bookcase_light_" in name:
-# bookcase_lights.append(mac)
-# elif "diningroom_light_" in name:
-# diningroom_lights.append(mac)
-# else:
- self.index_light(name, keywords, mac)
-
- # name = 'bookcase_lights'
- # group = []
- # keywords = 'perm wifi light smart goog dimmer'
- # for b in bookcase_lights:
- # group.append(self.get_light_by_mac(b))
- # self.bookcase_group = GoogleLightGroup(
- # name,
- # group,
- # keywords,
- # )
- # mac = self.bookcase_group.get_mac()
- # self.macs_by_name[name] = mac
- # self._keywords_by_name[name] = keywords
- # self.keywords_by_mac[mac] = keywords
- # self.names_by_mac[mac] = name
- # self.index_light(name, keywords, mac)
-
- # name = 'dining_room_lights'
- # group = []
- # for b in diningroom_lights:
- # group.append(self.get_light_by_mac(b))
- # self.diningroom_group = GoogleLightGroup(
- # name,
- # group,
- # keywords,
- # )
- # mac = self.diningroom_group.get_mac()
- # self.macs_by_name[name] = mac
- # self._keywords_by_name[name] = keywords
- # self.keywords_by_mac[mac] = keywords
- # self.names_by_mac[mac] = name
- # self.index_light(name, keywords, mac)
-
- def index_light(self, name: str, keywords: str, mac: str) -> None:
- properties = [("name", name)]
- tags = set()
- for kw in keywords.split():
- if ":" in kw:
- key, value = kw.split(":")
- properties.append((key, value))
- else:
- tags.add(kw)
- light = logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
- )
- self.corpus.add_doc(light)
-
- def __repr__(self) -> str:
- s = "Known devices:\n"
- for name, keywords in self._keywords_by_name.items():
- mac = self.macs_by_name[name]
- s += f" {name} ({mac}) => {keywords}\n"
- return s
-
- def get_keywords_by_name(self, name: str) -> Optional[str]:
- return self._keywords_by_name.get(name, None)
-
- def get_macs_by_name(self, name: str) -> Set[str]:
- retval = set()
- for (mac, lname) in self.names_by_mac.items():
- if name in lname:
- retval.add(mac)
- return retval
-
- def get_macs_by_keyword(self, keyword: str) -> Set[str]:
- retval = set()
- for (mac, keywords) in self.keywords_by_mac.items():
- if keyword in keywords:
- retval.add(mac)
- return retval
-
- def get_light_by_name(self, name: str) -> Optional[Light]:
- if name in self.macs_by_name:
- return self.get_light_by_mac(self.macs_by_name[name])
- return None
-
- def get_all_lights(self) -> List[Light]:
- retval = []
- for (mac, kws) in self.keywords_by_mac.items():
- if mac is not None:
- light = self.get_light_by_mac(mac)
- if light is not None:
- retval.append(light)
- return retval
-
- def get_light_by_mac(self, mac: str) -> Optional[Light]:
- if mac in self.keywords_by_mac:
- name = self.names_by_mac[mac]
- kws = self.keywords_by_mac[mac]
- if name == 'bookcase_lights':
- return self.bookcase_group
- elif name == 'dining_room_lights':
- return self.diningroom_group
- elif 'tplink' in kws.lower():
- return TPLinkLight(name, mac, kws)
- elif 'tuya' in kws.lower():
- return TuyaLight(name, mac, kws)
- else:
- return GoogleLight(name, mac, kws)
- return None
-
- def query(self, query: str) -> List[Light]:
- """Evaluates a lighting query expression formed of keywords to search
- for, logical operators (and, or, not), and parenthesis.
- Returns a list of matching lights.
- """
- retval = []
- results = self.corpus.query(query)
- if results is not None:
- for mac in results:
- if mac is not None:
- light = self.get_light_by_mac(mac)
- if light is not None:
- retval.append(light)
- return retval
+# class GoogleLightGroup(GoogleLight):
+# def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+# if len(members) < 1:
+# raise Exception("There must be at least one light in the group.")
+# self.members = members
+# mac = GoogleLightGroup.make_up_mac(members)
+# super().__init__(name, mac, keywords)
+
+# @staticmethod
+# def make_up_mac(members: List[GoogleLight]):
+# mac = members[0].get_mac()
+# b = mac.split(':')
+# b[5] = int(b[5], 16) + 1
+# if b[5] > 255:
+# b[5] = 0
+# b[5] = str(b[5])
+# return ":".join(b)
+
+# def is_on(self) -> bool:
+# r = ask_google(f"are {self.goog_name()} on?")
+# if not r.success:
+# return False
+# return 'is on' in r.audio_transcription
+
+# def get_dimmer_level(self) -> Optional[int]:
+# if not self.has_keyword("dimmer"):
+# return False
+# r = ask_google(f'how bright are {self.goog_name()}?')
+# if not r.success:
+# return None
+
+# # four lights are set to 100% brightness
+# txt = r.audio_transcription
+# m = re.search(r"(\d+)% bright", txt)
+# if m is not None:
+# return int(m.group(1))
+# if "is off" in txt:
+# return 0
+# return None
+
+# def set_dimmer_level(self, level: int) -> bool:
+# if not self.has_keyword("dimmer"):
+# return False
+# if 0 <= level <= 100:
+# was_on = self.is_on()
+# r = ask_google(f"set {self.goog_name()} to {level} percent")
+# if not r.success:
+# return False
+# if not was_on:
+# self.turn_off()
+# return True
+# return False
+
+# def make_color(self, color: str) -> bool:
+# return GoogleLight.parse_google_response(
+# ask_google(f"make {self.goog_name()} {color}")
+# )
--- /dev/null
+#!/usr/bin/env python3
+
+"""Utilities for dealing with the smart outlets."""
+
+from abc import abstractmethod
+import datetime
+import json
+import logging
+import os
+import re
+import subprocess
+import sys
+from typing import Any, Dict, List, Optional, Set
+
+import argparse_utils
+import config
+import logging_utils
+import smart_home.device as dev
+from google_assistant import ask_google, GoogleResponse
+from decorator_utils import timeout, memoized
+
+logger = logging.getLogger(__name__)
+
+parser = config.add_commandline_args(
+ f"Outlet Utils ({__file__})",
+ "Args related to smart outlets.",
+)
+parser.add_argument(
+ '--smart_outlets_tplink_location',
+ default='/home/scott/bin/tplink.py',
+ metavar='FILENAME',
+ help='The location of the tplink.py helper',
+ type=argparse_utils.valid_filename,
+)
+
+
+@timeout(
+ 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+)
+def tplink_outlet_command(command: str) -> bool:
+ result = os.system(command)
+ signal = result & 0xFF
+ if signal != 0:
+ logger.warning(f'{command} died with signal {signal}')
+ logging_utils.hlog("%s died with signal %d" % (command, signal))
+ return False
+ else:
+ exit_value = result >> 8
+ if exit_value != 0:
+ logger.warning(f'{command} failed, exited {exit_value}')
+ logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ return False
+ logger.debug(f'{command} succeeded.')
+ return True
+
+
+class BaseOutlet(dev.Device):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name.strip(), mac.strip(), keywords)
+ self.info = None
+
+ @abstractmethod
+ def turn_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def turn_off(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_off(self) -> bool:
+ pass
+
+
+class TPLinkOutlet(BaseOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = '') -> None:
+ super().__init__(name, mac, keywords)
+ self.info: Optional[Dict] = None
+ self.info_ts: Optional[datetime.datetime] = None
+
+ @memoized
+ def get_tplink_name(self) -> Optional[str]:
+ self.info = self.get_info()
+ if self.info is not None:
+ return self.info["alias"]
+ return None
+
+ def get_cmdline(self) -> str:
+ cmd = (
+ f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+ f"--no_logging_console "
+ )
+ return cmd
+
+ def command(self, cmd: str, extra_args: str = None) -> bool:
+ cmd = self.get_cmdline() + f"-c {cmd}"
+ if extra_args is not None:
+ cmd += f" {extra_args}"
+ return tplink_outlet_command(cmd)
+
+ def turn_on(self) -> bool:
+ return self.command('on')
+
+ def turn_off(self) -> bool:
+ return self.command('off')
+
+ def is_on(self) -> bool:
+ return self.get_on_duration_seconds() > 0
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ @timeout(
+ 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+ )
+ def get_info(self) -> Optional[Dict]:
+ cmd = self.get_cmdline() + "-c info"
+ out = subprocess.getoutput(cmd)
+ out = re.sub("Sent:.*\n", "", out)
+ out = re.sub("Received: *", "", out)
+ try:
+ self.info = json.loads(out)["system"]["get_sysinfo"]
+ self.info_ts = datetime.datetime.now()
+ return self.info
+ except Exception as e:
+ logger.exception(e)
+ print(out, file=sys.stderr)
+ self.info = None
+ self.info_ts = None
+ return None
+
+ def get_on_duration_seconds(self) -> int:
+ self.info = self.get_info()
+ if self.info is None:
+ return 0
+ return int(self.info.get("on_time", "0"))
+
+
+class TPLinkOutletWithChildren(TPLinkOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name, mac, keywords)
+ self.children: List[str] = []
+ self.info: Optional[Dict] = None
+ self.info_ts: Optional[datetime.datetime] = None
+ assert "children" in self.keywords
+ self.info = self.get_info()
+ if self.info is not None:
+ for child in self.info["children"]:
+ self.children.append(child["id"])
+
+ # override
+ def get_cmdline(self, child: Optional[str] = None) -> str:
+ cmd = (
+ f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+ f"--no_logging_console "
+ )
+ if child is not None:
+ cmd += f"-x {child} "
+ return cmd
+
+ # override
+ def command(
+ self, cmd: str, child: str = None, extra_args: str = None
+ ) -> bool:
+ cmd = self.get_cmdline(child) + f"-c {cmd}"
+ if extra_args is not None:
+ cmd += f" {extra_args}"
+ logger.debug(f'About to execute {cmd}')
+ return tplink_light_command(cmd)
+
+ def get_children(self) -> List[str]:
+ return self.children
+
+ def turn_on(self, child: str = None) -> bool:
+ return self.command("on", child)
+
+ def turn_off(self, child: str = None) -> bool:
+ return self.command("off", child)
+
+ def get_on_duration_seconds(self, child: str = None) -> int:
+ self.info = self.get_info()
+ if child is None:
+ if self.info is None:
+ return 0
+ return int(self.info.get("on_time", "0"))
+ else:
+ if self.info is None:
+ return 0
+ for chi in self.info.get("children", {}):
+ if chi["id"] == child:
+ return int(chi.get("on_time", "0"))
+ return 0
+
+
+class GoogleOutlet(BaseOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name.strip(), mac.strip(), keywords)
+ self.info = None
+
+ def goog_name(self) -> str:
+ name = self.get_name()
+ return name.replace('_', ' ')
+
+ @staticmethod
+ def parse_google_response(response: GoogleResponse) -> bool:
+ return response.success
+
+ def turn_on(self) -> bool:
+ return GoogleOutlet.parse_google_response(
+ ask_google('turn {self.goog_name()} on')
+ )
+
+ def turn_off(self) -> bool:
+ return GoogleOutlet.parse_google_response(
+ ask_google('turn {self.goog_name()} off')
+ )
+
+ def is_on(self) -> bool:
+ r = ask_google(f'is {self.goog_name()} on?')
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def is_off(self) -> bool:
+ return not self.is_on()