Scale back warnings.warn and add stacklevels= where appropriate.
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6
7 import argparse_utils
8 import config
9 import file_utils
10 import logical_search
11 import smart_home.device as device
12 import smart_home.cameras as cameras
13 import smart_home.chromecasts as chromecasts
14 import smart_home.lights as lights
15 import smart_home.outlets as outlets
16
17 args = config.add_commandline_args(
18     f"Smart Home Registry ({__file__})",
19     "Args related to the smart home configuration registry."
20 )
21 args.add_argument(
22     '--smart_home_registry_file_location',
23     default='/home/scott/bin/network_mac_addresses.txt',
24     metavar='FILENAME',
25     help='The location of network_mac_addresses.txt',
26     type=argparse_utils.valid_filename,
27 )
28
29
30 logger = logging.getLogger(__file__)
31
32
33 class SmartHomeRegistry(object):
34     def __init__(
35             self,
36             registry_file: Optional[str] = None,
37             filters: List[str] = ['smart'],
38     ) -> None:
39         self._macs_by_name = {}
40         self._keywords_by_name = {}
41         self._keywords_by_mac = {}
42         self._names_by_mac = {}
43         self._corpus = logical_search.Corpus()
44
45         # Read the disk config file...
46         if registry_file is None:
47             registry_file = config.config[
48                 'smart_home_registry_file_location'
49             ]
50         assert file_utils.does_file_exist(registry_file)
51         logger.debug(f'Reading {registry_file}')
52         with open(registry_file, "r") as f:
53             contents = f.readlines()
54
55         # Parse the contents...
56         for line in contents:
57             line = line.rstrip("\n")
58             line = re.sub(r"#.*$", r"", line)
59             line = line.strip()
60             if line == "":
61                 continue
62             logger.debug(f'SH-CONFIG> {line}')
63             try:
64                 (mac, name, keywords) = line.split(",")
65             except ValueError:
66                 msg = f'SH-CONFIG> "{line}" is malformed?!  Skipping it.'
67                 logger.warning(msg)
68                 continue
69             mac = mac.strip()
70             name = name.strip()
71             keywords = keywords.strip()
72
73             skip = False
74             if filters is not None:
75                 for f in filters:
76                     if f not in keywords:
77                         logger.debug(f'Skipping this entry b/c of filter {f}')
78                         skip = True
79                         break
80             if not skip:
81                 self._macs_by_name[name] = mac
82                 self._keywords_by_name[name] = keywords
83                 self._keywords_by_mac[mac] = keywords
84                 self._names_by_mac[mac] = name
85                 self.index_device(name, keywords, mac)
86
87     def index_device(self, name: str, keywords: str, mac: str) -> None:
88         properties = [("name", name)]
89         tags = set()
90         for kw in keywords.split():
91             if ":" in kw:
92                 key, value = kw.split(":")
93                 properties.append((key, value))
94             else:
95                 tags.add(kw)
96         device = logical_search.Document(
97             docid=mac,
98             tags=tags,
99             properties=properties,
100             reference=None,
101         )
102         logger.debug(f'Indexing document {device}')
103         self._corpus.add_doc(device)
104
105     def __repr__(self) -> str:
106         s = "Known devices:\n"
107         for name, keywords in self._keywords_by_name.items():
108             mac = self._macs_by_name[name]
109             s += f"  {name} ({mac}) => {keywords}\n"
110         return s
111
112     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
113         return self._keywords_by_name.get(name, None)
114
115     def get_macs_by_name(self, name: str) -> Set[str]:
116         retval = set()
117         for (mac, lname) in self._names_by_mac.items():
118             if name in lname:
119                 retval.add(mac)
120         return retval
121
122     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
123         retval = set()
124         for (mac, keywords) in self._keywords_by_mac.items():
125             if keyword in keywords:
126                 retval.add(mac)
127         return retval
128
129     def get_device_by_name(self, name: str) -> Optional[device.Device]:
130         if name in self._macs_by_name:
131             return self.get_device_by_mac(self._macs_by_name[name])
132         return None
133
134     def get_all_devices(self) -> List[device.Device]:
135         retval = []
136         for (mac, kws) in self._keywords_by_mac.items():
137             if mac is not None:
138                 device = self.get_device_by_mac(mac)
139                 if device is not None:
140                     retval.append(device)
141         return retval
142
143     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
144         if mac in self._keywords_by_mac:
145             name = self._names_by_mac[mac]
146             kws = self._keywords_by_mac[mac]
147             logger.debug(f'Found {name} -> {mac} ({kws})')
148             try:
149                 if 'light' in kws.lower():
150                     if 'tplink' in kws.lower():
151                         logger.debug('    ...a TPLinkLight')
152                         return lights.TPLinkLight(name, mac, kws)
153                     elif 'tuya' in kws.lower():
154                         logger.debug('    ...a TuyaLight')
155                         return lights.TuyaLight(name, mac, kws)
156                     elif 'goog' in kws.lower():
157                         logger.debug('    ...a GoogleLight')
158                         return lights.GoogleLight(name, mac, kws)
159                     else:
160                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
161                 elif 'outlet' in kws.lower():
162                     if 'tplink' in kws.lower():
163                         if 'children' in kws.lower():
164                             logger.debug('    ...a TPLinkOutletWithChildren')
165                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
166                         else:
167                             logger.debug('    ...a TPLinkOutlet')
168                             return outlets.TPLinkOutlet(name, mac, kws)
169                     elif 'meross' in kws.lower():
170                         logger.debug('    ...a MerossOutlet')
171                         return outlets.MerossOutlet(name, mac, kws)
172                     elif 'goog' in kws.lower():
173                         logger.debug('    ...a GoogleOutlet')
174                         return outlets.GoogleOutlet(name, mac, kws)
175                     else:
176                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
177                 elif 'camera' in kws.lower():
178                     logger.debug('    ...a BaseCamera')
179                     return cameras.BaseCamera(name, mac, kws)
180                 elif 'ccast' in kws.lower():
181                     logger.debug('    ...a Chromecast')
182                     return chromecasts.BaseChromecast(name, mac, kws)
183                 else:
184                     logger.debug('    ...an unknown device (should this be here?)')
185                     return device.Device(name, mac, kws)
186             except Exception as e:
187                 logger.exception(e)
188                 return device.Device(name, mac, kws)
189         msg = f'{mac} is not a known smart home device, returning None'
190         logger.warning(msg)
191         return None
192
193     def query(self, query: str) -> List[device.Device]:
194         """Evaluates a lighting query expression formed of keywords to search
195         for, logical operators (and, or, not), and parenthesis.
196         Returns a list of matching lights.
197         """
198         retval = []
199         logger.debug(f'Executing query {query}')
200         results = self._corpus.query(query)
201         if results is not None:
202             for mac in results:
203                 if mac is not None:
204                     device = self.get_device_by_mac(mac)
205                     if device is not None:
206                         retval.append(device)
207         return retval