Start using warnings from stdlib.
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6 import warnings
7
8 import argparse_utils
9 import config
10 import file_utils
11 import logical_search
12 import smart_home.device as device
13 import smart_home.cameras as cameras
14 import smart_home.chromecasts as chromecasts
15 import smart_home.lights as lights
16 import smart_home.outlets as outlets
17
18 args = config.add_commandline_args(
19     f"Smart Home Registry ({__file__})",
20     "Args related to the smart home configuration registry."
21 )
22 args.add_argument(
23     '--smart_home_registry_file_location',
24     default='/home/scott/bin/network_mac_addresses.txt',
25     metavar='FILENAME',
26     help='The location of network_mac_addresses.txt',
27     type=argparse_utils.valid_filename,
28 )
29
30
31 logger = logging.getLogger(__file__)
32
33
34 class SmartHomeRegistry(object):
35     def __init__(
36             self,
37             registry_file: Optional[str] = None,
38             filters: List[str] = ['smart'],
39     ) -> None:
40         self._macs_by_name = {}
41         self._keywords_by_name = {}
42         self._keywords_by_mac = {}
43         self._names_by_mac = {}
44         self._corpus = logical_search.Corpus()
45
46         # Read the disk config file...
47         if registry_file is None:
48             registry_file = config.config[
49                 'smart_home_registry_file_location'
50             ]
51         assert file_utils.does_file_exist(registry_file)
52         logger.debug(f'Reading {registry_file}')
53         with open(registry_file, "r") as f:
54             contents = f.readlines()
55
56         # Parse the contents...
57         for line in contents:
58             line = line.rstrip("\n")
59             line = re.sub(r"#.*$", r"", line)
60             line = line.strip()
61             if line == "":
62                 continue
63             logger.debug(f'SH-CONFIG> {line}')
64             try:
65                 (mac, name, keywords) = line.split(",")
66             except ValueError:
67                 msg = f'SH-CONFIG> {line} is malformed?!'
68                 logger.warning(msg)
69                 warnings.warn(msg)
70                 continue
71             mac = mac.strip()
72             name = name.strip()
73             keywords = keywords.strip()
74
75             skip = False
76             if filters is not None:
77                 for f in filters:
78                     if f not in keywords:
79                         logger.debug(f'Skipping this entry b/c of filter {f}')
80                         skip = True
81                         break
82             if not skip:
83                 self._macs_by_name[name] = mac
84                 self._keywords_by_name[name] = keywords
85                 self._keywords_by_mac[mac] = keywords
86                 self._names_by_mac[mac] = name
87                 self.index_device(name, keywords, mac)
88
89     def index_device(self, name: str, keywords: str, mac: str) -> None:
90         properties = [("name", name)]
91         tags = set()
92         for kw in keywords.split():
93             if ":" in kw:
94                 key, value = kw.split(":")
95                 properties.append((key, value))
96             else:
97                 tags.add(kw)
98         device = logical_search.Document(
99             docid=mac,
100             tags=tags,
101             properties=properties,
102             reference=None,
103         )
104         logger.debug(f'Indexing document {device}')
105         self._corpus.add_doc(device)
106
107     def __repr__(self) -> str:
108         s = "Known devices:\n"
109         for name, keywords in self._keywords_by_name.items():
110             mac = self._macs_by_name[name]
111             s += f"  {name} ({mac}) => {keywords}\n"
112         return s
113
114     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
115         return self._keywords_by_name.get(name, None)
116
117     def get_macs_by_name(self, name: str) -> Set[str]:
118         retval = set()
119         for (mac, lname) in self._names_by_mac.items():
120             if name in lname:
121                 retval.add(mac)
122         return retval
123
124     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
125         retval = set()
126         for (mac, keywords) in self._keywords_by_mac.items():
127             if keyword in keywords:
128                 retval.add(mac)
129         return retval
130
131     def get_device_by_name(self, name: str) -> Optional[device.Device]:
132         if name in self._macs_by_name:
133             return self.get_device_by_mac(self._macs_by_name[name])
134         return None
135
136     def get_all_devices(self) -> List[device.Device]:
137         retval = []
138         for (mac, kws) in self._keywords_by_mac.items():
139             if mac is not None:
140                 device = self.get_device_by_mac(mac)
141                 if device is not None:
142                     retval.append(device)
143         return retval
144
145     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
146         if mac in self._keywords_by_mac:
147             name = self._names_by_mac[mac]
148             kws = self._keywords_by_mac[mac]
149             logger.debug(f'Found {name} -> {mac} ({kws})')
150             try:
151                 if 'light' in kws.lower():
152                     if 'tplink' in kws.lower():
153                         logger.debug('    ...a TPLinkLight')
154                         return lights.TPLinkLight(name, mac, kws)
155                     elif 'tuya' in kws.lower():
156                         logger.debug('    ...a TuyaLight')
157                         return lights.TuyaLight(name, mac, kws)
158                     elif 'goog' in kws.lower():
159                         logger.debug('    ...a GoogleLight')
160                         return lights.GoogleLight(name, mac, kws)
161                     else:
162                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
163                 elif 'outlet' in kws.lower():
164                     if 'tplink' in kws.lower():
165                         if 'children' in kws.lower():
166                             logger.debug('    ...a TPLinkOutletWithChildren')
167                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
168                         else:
169                             logger.debug('    ...a TPLinkOutlet')
170                             return outlets.TPLinkOutlet(name, mac, kws)
171                     elif 'meross' in kws.lower():
172                         logger.debug('    ...a MerossOutlet')
173                         return outlets.MerossOutlet(name, mac, kws)
174                     elif 'goog' in kws.lower():
175                         logger.debug('    ...a GoogleOutlet')
176                         return outlets.GoogleOutlet(name, mac, kws)
177                     else:
178                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
179                 elif 'camera' in kws.lower():
180                     logger.debug('    ...a BaseCamera')
181                     return cameras.BaseCamera(name, mac, kws)
182                 elif 'ccast' in kws.lower():
183                     logger.debug('    ...a Chromecast')
184                     return chromecasts.BaseChromecast(name, mac, kws)
185                 else:
186                     logger.debug('    ...an unknown device (should this be here?)')
187                     return device.Device(name, mac, kws)
188             except Exception as e:
189                 logger.exception(e)
190                 return device.Device(name, mac, kws)
191         msg = f'{mac} is not a known smart home device, returning None'
192         logger.warning(msg)
193         warnings.warn(msg)
194         return None
195
196     def query(self, query: str) -> List[device.Device]:
197         """Evaluates a lighting query expression formed of keywords to search
198         for, logical operators (and, or, not), and parenthesis.
199         Returns a list of matching lights.
200         """
201         retval = []
202         logger.debug(f'Executing query {query}')
203         results = self._corpus.query(query)
204         if results is not None:
205             for mac in results:
206                 if mac is not None:
207                     device = self.get_device_by_mac(mac)
208                     if device is not None:
209                         retval.append(device)
210         return retval