Adds shuffle/scramble to list_utils.
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 """A searchable registry of known smart home devices and a factory for
4 constructing our wrappers around them."""
5
6 import logging
7 import re
8 from typing import Dict, List, Optional, Set
9
10 import argparse_utils
11 import config
12 import file_utils
13 import logical_search
14 from smart_home import cameras, chromecasts, device, lights, outlets
15
16 args = config.add_commandline_args(
17     f"Smart Home Registry ({__file__})",
18     "Args related to the smart home configuration registry.",
19 )
20 args.add_argument(
21     '--smart_home_registry_file_location',
22     default='/home/scott/bin/network_mac_addresses.txt',
23     metavar='FILENAME',
24     help='The location of network_mac_addresses.txt',
25     type=argparse_utils.valid_filename,
26 )
27
28
29 logger = logging.getLogger(__name__)
30
31
32 class SmartHomeRegistry(object):
33     """A searchable registry of known smart home devices and a factory for
34     constructing our wrappers around them."""
35
36     def __init__(
37         self,
38         registry_file: Optional[str] = None,
39         filters: List[str] = ['smart'],
40     ) -> None:
41         self._macs_by_name: Dict[str, str] = {}
42         self._keywords_by_name: Dict[str, str] = {}
43         self._keywords_by_mac: Dict[str, str] = {}
44         self._names_by_mac: Dict[str, str] = {}
45         self._corpus: logical_search.Corpus = logical_search.Corpus()
46
47         # Read the disk config file...
48         if registry_file is None:
49             registry_file = config.config['smart_home_registry_file_location']
50         assert file_utils.does_file_exist(registry_file)
51         logger.debug('Reading %s', registry_file)
52         with open(registry_file, "r") as rf:
53             contents = rf.readlines()
54
55         # Parse the contents...
56         for line in contents:
57             line = line.rstrip("\n")
58             line = re.sub(r"#.*$", r"", line)
59             line = line.strip()
60             if line == "":
61                 continue
62             logger.debug('SH-CONFIG> %s', line)
63             try:
64                 (mac, name, keywords) = line.split(",")
65             except ValueError:
66                 logger.warning('SH-CONFIG> "%s" is malformed?!  Skipping it.', line)
67                 continue
68             mac = mac.strip()
69             name = name.strip()
70             keywords = keywords.strip()
71
72             skip = False
73             if filters is not None:
74                 for f in filters:
75                     if f not in keywords:
76                         logger.debug('Skipping this entry b/c of filter: %s', f)
77                         skip = True
78                         break
79             if not skip:
80                 self._macs_by_name[name] = mac
81                 self._keywords_by_name[name] = keywords
82                 self._keywords_by_mac[mac] = keywords
83                 self._names_by_mac[mac] = name
84                 self.index_device(name, keywords, mac)
85
86     def index_device(self, name: str, keywords: str, mac: str) -> None:
87         properties = [("name", name)]
88         tags = set()
89         for kw in keywords.split():
90             if ":" in kw:
91                 key, value = kw.split(":")
92                 properties.append((key, value))
93             else:
94                 tags.add(kw)
95         dev = logical_search.Document(
96             docid=mac,
97             tags=tags,
98             properties=properties,
99             reference=None,
100         )
101         logger.debug('Indexing document: %s', dev)
102         self._corpus.add_doc(dev)
103
104     def __repr__(self) -> str:
105         s = "Known devices:\n"
106         for name, keywords in self._keywords_by_name.items():
107             mac = self._macs_by_name[name]
108             s += f"  {name} ({mac}) => {keywords}\n"
109         return s
110
111     def get_keywords_by_name(self, name: str) -> Optional[str]:
112         return self._keywords_by_name.get(name, None)
113
114     def get_macs_by_name(self, name: str) -> Set[str]:
115         retval = set()
116         for (mac, lname) in self._names_by_mac.items():
117             if name in lname:
118                 retval.add(mac)
119         return retval
120
121     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
122         retval = set()
123         for (mac, keywords) in self._keywords_by_mac.items():
124             if keyword in keywords:
125                 retval.add(mac)
126         return retval
127
128     def get_device_by_name(self, name: str) -> Optional[device.Device]:
129         if name in self._macs_by_name:
130             return self.get_device_by_mac(self._macs_by_name[name])
131         return None
132
133     def get_all_devices(self) -> List[device.Device]:
134         retval = []
135         for mac, _ in self._keywords_by_mac.items():
136             if mac is not None:
137                 dev = self.get_device_by_mac(mac)
138                 if dev is not None:
139                     retval.append(dev)
140         return retval
141
142     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
143         if mac in self._keywords_by_mac:
144             name = self._names_by_mac[mac]
145             kws = self._keywords_by_mac[mac]
146             logger.debug('Found %s -> %s (%s)', name, mac, kws)
147             try:
148                 if 'light' in kws.lower():
149                     if 'tplink' in kws.lower():
150                         logger.debug('    ...a TPLinkLight')
151                         return lights.TPLinkLight(name, mac, kws)
152                     elif 'tuya' in kws.lower():
153                         logger.debug('    ...a TuyaLight')
154                         return lights.TuyaLight(name, mac, kws)
155                     elif 'goog' in kws.lower():
156                         logger.debug('    ...a GoogleLight')
157                         return lights.GoogleLight(name, mac, kws)
158                     else:
159                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
160                 elif 'outlet' in kws.lower():
161                     if 'tplink' in kws.lower():
162                         if 'children' in kws.lower():
163                             logger.debug('    ...a TPLinkOutletWithChildren')
164                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
165                         else:
166                             logger.debug('    ...a TPLinkOutlet')
167                             return outlets.TPLinkOutlet(name, mac, kws)
168                     elif 'meross' in kws.lower():
169                         logger.debug('    ...a MerossOutlet')
170                         return outlets.MerossOutlet(name, mac, kws)
171                     elif 'goog' in kws.lower():
172                         logger.debug('    ...a GoogleOutlet')
173                         return outlets.GoogleOutlet(name, mac, kws)
174                     else:
175                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
176                 elif 'camera' in kws.lower():
177                     logger.debug('    ...a BaseCamera')
178                     return cameras.BaseCamera(name, mac, kws)
179                 elif 'ccast' in kws.lower():
180                     logger.debug('    ...a Chromecast')
181                     return chromecasts.BaseChromecast(name, mac, kws)
182                 else:
183                     logger.debug('    ...an unknown device (should this be here?)')
184                     return device.Device(name, mac, kws)
185             except Exception as e:
186                 logger.exception(e)
187                 logger.debug(
188                     'Device %s at %s with %s confused me; returning a generic Device',
189                     name,
190                     mac,
191                     kws,
192                 )
193                 return device.Device(name, mac, kws)
194         logger.warning('%s is not a known smart home device, returning None', mac)
195         return None
196
197     def query(self, query: str) -> List[device.Device]:
198         """Evaluates a lighting query expression formed of keywords to search
199         for, logical operators (and, or, not), and parenthesis.
200         Returns a list of matching lights.
201         """
202         retval = []
203         logger.debug('Executing query: %s', query)
204         results = self._corpus.query(query)
205         if results is not None:
206             for mac in results:
207                 if mac is not None:
208                     dev = self.get_device_by_mac(mac)
209                     if dev is not None:
210                         retval.append(dev)
211         return retval