Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A searchable registry of known smart home devices and a factory for
6 constructing our wrappers around them."""
7
8 import logging
9 import re
10 from typing import Dict, List, Optional, Set
11
12 import argparse_utils
13 import config
14 import file_utils
15 import logical_search
16 from smart_home import cameras, chromecasts, device, lights, outlets
17
18 args = config.add_commandline_args(
19     f"Smart Home Registry ({__file__})",
20     "Args related to the smart home configuration registry.",
21 )
22 args.add_argument(
23     '--smart_home_registry_file_location',
24     default='/home/scott/bin/network_mac_addresses.txt',
25     metavar='FILENAME',
26     help='The location of network_mac_addresses.txt',
27     type=argparse_utils.valid_filename,
28 )
29
30
31 logger = logging.getLogger(__name__)
32
33
34 class SmartHomeRegistry(object):
35     """A searchable registry of known smart home devices and a factory for
36     constructing our wrappers around them."""
37
38     def __init__(
39         self,
40         registry_file: Optional[str] = None,
41         filters: List[str] = ['smart'],
42     ) -> None:
43         self._macs_by_name: Dict[str, str] = {}
44         self._keywords_by_name: Dict[str, str] = {}
45         self._keywords_by_mac: Dict[str, str] = {}
46         self._names_by_mac: Dict[str, str] = {}
47         self._corpus: logical_search.Corpus = logical_search.Corpus()
48
49         # Read the disk config file...
50         if registry_file is None:
51             registry_file = config.config['smart_home_registry_file_location']
52         assert file_utils.does_file_exist(registry_file)
53         logger.debug('Reading %s', registry_file)
54         with open(registry_file, "r") as rf:
55             contents = rf.readlines()
56
57         # Parse the contents...
58         for line in contents:
59             line = line.rstrip("\n")
60             line = re.sub(r"#.*$", r"", line)
61             line = line.strip()
62             if line == "":
63                 continue
64             logger.debug('SH-CONFIG> %s', line)
65             try:
66                 (mac, name, keywords) = line.split(",")
67             except ValueError:
68                 logger.warning('SH-CONFIG> "%s" is malformed?!  Skipping it.', line)
69                 continue
70             mac = mac.strip()
71             name = name.strip()
72             keywords = keywords.strip()
73
74             skip = False
75             if filters is not None:
76                 for f in filters:
77                     if f not in keywords:
78                         logger.debug('Skipping this entry b/c of filter: %s', f)
79                         skip = True
80                         break
81             if not skip:
82                 self._macs_by_name[name] = mac
83                 self._keywords_by_name[name] = keywords
84                 self._keywords_by_mac[mac] = keywords
85                 self._names_by_mac[mac] = name
86                 self._index_device(name, keywords, mac)
87
88     def _index_device(self, name: str, keywords: str, mac: str) -> None:
89         properties = [("name", name)]
90         tags = set()
91         for kw in keywords.split():
92             if ":" in kw:
93                 key, value = kw.split(":")
94                 properties.append((key, value))
95             else:
96                 tags.add(kw)
97         dev = logical_search.Document(
98             docid=mac,
99             tags=tags,
100             properties=properties,
101             reference=None,
102         )
103         logger.debug('Indexing document: %s', dev)
104         self._corpus.add_doc(dev)
105
106     def __repr__(self) -> str:
107         s = "Known devices:\n"
108         for name, keywords in self._keywords_by_name.items():
109             mac = self._macs_by_name[name]
110             s += f"  {name} ({mac}) => {keywords}\n"
111         return s
112
113     def get_keywords_by_name(self, name: str) -> Optional[str]:
114         """Given the name of a device, get its keywords.
115
116         >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt')
117         >>> reg.get_keywords_by_name('near_kitchen_lamp')
118         'wifi smart light goog meross test'
119
120         >>> reg.get_keywords_by_name('unknown') is None
121         True
122
123         """
124         return self._keywords_by_name.get(name, None)
125
126     def get_macs_by_name(self, name: str) -> Set[str]:
127         """Given the name of a device, get its MAC address(es)
128
129         >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt')
130         >>> reg.get_macs_by_name('near_kitchen_lamp')
131         {'34:29:8F:12:34:8E'}
132
133         >>> reg.get_macs_by_name('unknown')
134         set()
135
136         """
137
138         retval = set()
139         for (mac, lname) in self._names_by_mac.items():
140             if name in lname:
141                 retval.add(mac)
142         return retval
143
144     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
145         """Given a keyword, return the set of MAC address(es) that have
146         that keyword.
147
148         >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt')
149         >>> r = reg.get_macs_by_keyword('test')
150         >>> e = set(['34:29:8F:12:26:74' , '34:29:8F:12:34:8E'])
151         >>> r == e
152         True
153
154         >>> reg.get_macs_by_keyword('unknown')
155         set()
156
157         """
158         retval = set()
159         for (mac, keywords) in self._keywords_by_mac.items():
160             if keyword in keywords:
161                 retval.add(mac)
162         return retval
163
164     def get_device_by_name(self, name: str) -> Optional[device.Device]:
165         """Given a name, return its Device object."""
166
167         if name in self._macs_by_name:
168             return self.get_device_by_mac(self._macs_by_name[name])
169         return None
170
171     def get_all_devices(self) -> List[device.Device]:
172         """Return a list of all known devices."""
173
174         retval = []
175         for mac, _ in self._keywords_by_mac.items():
176             if mac is not None:
177                 dev = self.get_device_by_mac(mac)
178                 if dev is not None:
179                     retval.append(dev)
180         return retval
181
182     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
183         """Given a MAC address, return its Device object."""
184
185         if mac in self._keywords_by_mac:
186             name = self._names_by_mac[mac]
187             kws = self._keywords_by_mac[mac]
188             logger.debug('Found %s -> %s (%s)', name, mac, kws)
189             try:
190                 if 'light' in kws.lower():
191                     if 'tplink' in kws.lower():
192                         logger.debug('    ...a TPLinkLight')
193                         return lights.TPLinkLight(name, mac, kws)
194                     elif 'tuya' in kws.lower():
195                         logger.debug('    ...a TuyaLight')
196                         return lights.TuyaLight(name, mac, kws)
197                     elif 'goog' in kws.lower():
198                         logger.debug('    ...a GoogleLight')
199                         return lights.GoogleLight(name, mac, kws)
200                     else:
201                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
202                 elif 'outlet' in kws.lower():
203                     if 'tplink' in kws.lower():
204                         if 'children' in kws.lower():
205                             logger.debug('    ...a TPLinkOutletWithChildren')
206                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
207                         else:
208                             logger.debug('    ...a TPLinkOutlet')
209                             return outlets.TPLinkOutlet(name, mac, kws)
210                     elif 'meross' in kws.lower():
211                         logger.debug('    ...a MerossOutlet')
212                         return outlets.MerossOutlet(name, mac, kws)
213                     elif 'goog' in kws.lower():
214                         logger.debug('    ...a GoogleOutlet')
215                         return outlets.GoogleOutlet(name, mac, kws)
216                     else:
217                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
218                 elif 'camera' in kws.lower():
219                     logger.debug('    ...a BaseCamera')
220                     return cameras.BaseCamera(name, mac, kws)
221                 elif 'ccast' in kws.lower():
222                     logger.debug('    ...a Chromecast')
223                     return chromecasts.BaseChromecast(name, mac, kws)
224                 else:
225                     logger.debug('    ...an unknown device (should this be here?)')
226                     return device.Device(name, mac, kws)
227             except Exception as e:
228                 logger.exception(e)
229                 logger.debug(
230                     'Device %s at %s with %s confused me; returning a generic Device',
231                     name,
232                     mac,
233                     kws,
234                 )
235                 return device.Device(name, mac, kws)
236         logger.warning('%s is not a known smart home device, returning None', mac)
237         return None
238
239     def query(self, query: str) -> List[device.Device]:
240         """Evaluates a device query expression formed of keywords to search
241         for, logical operators (and, or, not), and parenthesis.
242         Returns a list of matching lights.
243         """
244         retval = []
245         logger.debug('Executing query: %s', query)
246         results = self._corpus.query(query)
247         if results is not None:
248             for mac in results:
249                 if mac is not None:
250                     dev = self.get_device_by_mac(mac)
251                     if dev is not None:
252                         retval.append(dev)
253         return retval
254
255
256 if __name__ == '__main__':
257     import doctest
258
259     doctest.testmod()