Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A searchable registry of known smart home devices and a factory for
6 constructing our wrappers around them."""
7
8 import logging
9 import re
10 from typing import Dict, List, Optional, Set
11
12 import argparse_utils
13 import config
14 import file_utils
15 import logical_search
16 from smart_home import cameras, chromecasts, device, lights, outlets
17
18 args = config.add_commandline_args(
19     f"Smart Home Registry ({__file__})",
20     "Args related to the smart home configuration registry.",
21 )
22 args.add_argument(
23     '--smart_home_registry_file_location',
24     default='/home/scott/bin/network_mac_addresses.txt',
25     metavar='FILENAME',
26     help='The location of network_mac_addresses.txt',
27     type=argparse_utils.valid_filename,
28 )
29
30
31 logger = logging.getLogger(__name__)
32
33
34 class SmartHomeRegistry(object):
35     """A searchable registry of known smart home devices and a factory for
36     constructing our wrappers around them."""
37
38     def __init__(
39         self,
40         registry_file: Optional[str] = None,
41         filters: List[str] = ['smart'],
42     ) -> None:
43         self._macs_by_name: Dict[str, str] = {}
44         self._keywords_by_name: Dict[str, str] = {}
45         self._keywords_by_mac: Dict[str, str] = {}
46         self._names_by_mac: Dict[str, str] = {}
47         self._corpus: logical_search.Corpus = logical_search.Corpus()
48
49         # Read the disk config file...
50         if registry_file is None:
51             registry_file = config.config['smart_home_registry_file_location']
52         assert file_utils.does_file_exist(registry_file)
53         logger.debug('Reading %s', registry_file)
54         with open(registry_file, "r") as rf:
55             contents = rf.readlines()
56
57         # Parse the contents...
58         for line in contents:
59             line = line.rstrip("\n")
60             line = re.sub(r"#.*$", r"", line)
61             line = line.strip()
62             if line == "":
63                 continue
64             logger.debug('SH-CONFIG> %s', line)
65             try:
66                 (mac, name, keywords) = line.split(",")
67             except ValueError:
68                 logger.warning('SH-CONFIG> "%s" is malformed?!  Skipping it.', line)
69                 continue
70             mac = mac.strip()
71             name = name.strip()
72             keywords = keywords.strip()
73
74             skip = False
75             if filters is not None:
76                 for f in filters:
77                     if f not in keywords:
78                         logger.debug('Skipping this entry b/c of filter: %s', f)
79                         skip = True
80                         break
81             if not skip:
82                 self._macs_by_name[name] = mac
83                 self._keywords_by_name[name] = keywords
84                 self._keywords_by_mac[mac] = keywords
85                 self._names_by_mac[mac] = name
86                 self.index_device(name, keywords, mac)
87
88     def index_device(self, name: str, keywords: str, mac: str) -> None:
89         properties = [("name", name)]
90         tags = set()
91         for kw in keywords.split():
92             if ":" in kw:
93                 key, value = kw.split(":")
94                 properties.append((key, value))
95             else:
96                 tags.add(kw)
97         dev = logical_search.Document(
98             docid=mac,
99             tags=tags,
100             properties=properties,
101             reference=None,
102         )
103         logger.debug('Indexing document: %s', dev)
104         self._corpus.add_doc(dev)
105
106     def __repr__(self) -> str:
107         s = "Known devices:\n"
108         for name, keywords in self._keywords_by_name.items():
109             mac = self._macs_by_name[name]
110             s += f"  {name} ({mac}) => {keywords}\n"
111         return s
112
113     def get_keywords_by_name(self, name: str) -> Optional[str]:
114         return self._keywords_by_name.get(name, None)
115
116     def get_macs_by_name(self, name: str) -> Set[str]:
117         retval = set()
118         for (mac, lname) in self._names_by_mac.items():
119             if name in lname:
120                 retval.add(mac)
121         return retval
122
123     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
124         retval = set()
125         for (mac, keywords) in self._keywords_by_mac.items():
126             if keyword in keywords:
127                 retval.add(mac)
128         return retval
129
130     def get_device_by_name(self, name: str) -> Optional[device.Device]:
131         if name in self._macs_by_name:
132             return self.get_device_by_mac(self._macs_by_name[name])
133         return None
134
135     def get_all_devices(self) -> List[device.Device]:
136         retval = []
137         for mac, _ in self._keywords_by_mac.items():
138             if mac is not None:
139                 dev = self.get_device_by_mac(mac)
140                 if dev is not None:
141                     retval.append(dev)
142         return retval
143
144     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
145         if mac in self._keywords_by_mac:
146             name = self._names_by_mac[mac]
147             kws = self._keywords_by_mac[mac]
148             logger.debug('Found %s -> %s (%s)', name, mac, kws)
149             try:
150                 if 'light' in kws.lower():
151                     if 'tplink' in kws.lower():
152                         logger.debug('    ...a TPLinkLight')
153                         return lights.TPLinkLight(name, mac, kws)
154                     elif 'tuya' in kws.lower():
155                         logger.debug('    ...a TuyaLight')
156                         return lights.TuyaLight(name, mac, kws)
157                     elif 'goog' in kws.lower():
158                         logger.debug('    ...a GoogleLight')
159                         return lights.GoogleLight(name, mac, kws)
160                     else:
161                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
162                 elif 'outlet' in kws.lower():
163                     if 'tplink' in kws.lower():
164                         if 'children' in kws.lower():
165                             logger.debug('    ...a TPLinkOutletWithChildren')
166                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
167                         else:
168                             logger.debug('    ...a TPLinkOutlet')
169                             return outlets.TPLinkOutlet(name, mac, kws)
170                     elif 'meross' in kws.lower():
171                         logger.debug('    ...a MerossOutlet')
172                         return outlets.MerossOutlet(name, mac, kws)
173                     elif 'goog' in kws.lower():
174                         logger.debug('    ...a GoogleOutlet')
175                         return outlets.GoogleOutlet(name, mac, kws)
176                     else:
177                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
178                 elif 'camera' in kws.lower():
179                     logger.debug('    ...a BaseCamera')
180                     return cameras.BaseCamera(name, mac, kws)
181                 elif 'ccast' in kws.lower():
182                     logger.debug('    ...a Chromecast')
183                     return chromecasts.BaseChromecast(name, mac, kws)
184                 else:
185                     logger.debug('    ...an unknown device (should this be here?)')
186                     return device.Device(name, mac, kws)
187             except Exception as e:
188                 logger.exception(e)
189                 logger.debug(
190                     'Device %s at %s with %s confused me; returning a generic Device',
191                     name,
192                     mac,
193                     kws,
194                 )
195                 return device.Device(name, mac, kws)
196         logger.warning('%s is not a known smart home device, returning None', mac)
197         return None
198
199     def query(self, query: str) -> List[device.Device]:
200         """Evaluates a lighting query expression formed of keywords to search
201         for, logical operators (and, or, not), and parenthesis.
202         Returns a list of matching lights.
203         """
204         retval = []
205         logger.debug('Executing query: %s', query)
206         results = self._corpus.query(query)
207         if results is not None:
208             for mac in results:
209                 if mac is not None:
210                     dev = self.get_device_by_mac(mac)
211                     if dev is not None:
212                         retval.append(dev)
213         return retval