Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc...
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6
7 import argparse_utils
8 import config
9 import file_utils
10 import logical_search
11 import smart_home.device as device
12 import smart_home.cameras as cameras
13 import smart_home.chromecasts as chromecasts
14 import smart_home.lights as lights
15 import smart_home.outlets as outlets
16
17 args = config.add_commandline_args(
18     f"Smart Home Registry ({__file__})",
19     "Args related to the smart home configuration registry."
20 )
21 args.add_argument(
22     '--smart_home_registry_file_location',
23     default='/home/scott/bin/network_mac_addresses.txt',
24     metavar='FILENAME',
25     help='The location of network_mac_addresses.txt',
26     type=argparse_utils.valid_filename,
27 )
28
29
30 logger = logging.getLogger(__file__)
31
32
33 class SmartHomeRegistry(object):
34     def __init__(
35             self,
36             registry_file: Optional[str] = None,
37             filters: List[str] = ['smart'],
38     ) -> None:
39         self._macs_by_name = {}
40         self._keywords_by_name = {}
41         self._keywords_by_mac = {}
42         self._names_by_mac = {}
43         self._corpus = logical_search.Corpus()
44
45         # Read the disk config file...
46         if registry_file is None:
47             registry_file = config.config[
48                 'smart_home_registry_file_location'
49             ]
50         assert file_utils.does_file_exist(registry_file)
51         logger.debug(f'Reading {registry_file}')
52         with open(registry_file, "r") as f:
53             contents = f.readlines()
54
55         # Parse the contents...
56         for line in contents:
57             line = line.rstrip("\n")
58             line = re.sub(r"#.*$", r"", line)
59             line = line.strip()
60             if line == "":
61                 continue
62             logger.debug(f'SH-CONFIG> {line}')
63             (mac, name, keywords) = line.split(",")
64             mac = mac.strip()
65             name = name.strip()
66             keywords = keywords.strip()
67
68             skip = False
69             if filters is not None:
70                 for f in filters:
71                     if f not in keywords:
72                         logger.debug(f'Skipping this entry b/c of filter {f}')
73                         skip = True
74                         break
75             if not skip:
76                 self._macs_by_name[name] = mac
77                 self._keywords_by_name[name] = keywords
78                 self._keywords_by_mac[mac] = keywords
79                 self._names_by_mac[mac] = name
80                 self.index_device(name, keywords, mac)
81
82     def index_device(self, name: str, keywords: str, mac: str) -> None:
83         properties = [("name", name)]
84         tags = set()
85         for kw in keywords.split():
86             if ":" in kw:
87                 key, value = kw.split(":")
88                 properties.append((key, value))
89             else:
90                 tags.add(kw)
91         device = logical_search.Document(
92             docid=mac,
93             tags=tags,
94             properties=properties,
95             reference=None,
96         )
97         logger.debug(f'Indexing document {device}')
98         self._corpus.add_doc(device)
99
100     def __repr__(self) -> str:
101         s = "Known devices:\n"
102         for name, keywords in self._keywords_by_name.items():
103             mac = self._macs_by_name[name]
104             s += f"  {name} ({mac}) => {keywords}\n"
105         return s
106
107     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
108         return self._keywords_by_name.get(name, None)
109
110     def get_macs_by_name(self, name: str) -> Set[str]:
111         retval = set()
112         for (mac, lname) in self._names_by_mac.items():
113             if name in lname:
114                 retval.add(mac)
115         return retval
116
117     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
118         retval = set()
119         for (mac, keywords) in self._keywords_by_mac.items():
120             if keyword in keywords:
121                 retval.add(mac)
122         return retval
123
124     def get_device_by_name(self, name: str) -> Optional[device.Device]:
125         if name in self._macs_by_name:
126             return self.get_device_by_mac(self._macs_by_name[name])
127         return None
128
129     def get_all_devices(self) -> List[device.Device]:
130         retval = []
131         for (mac, kws) in self._keywords_by_mac.items():
132             if mac is not None:
133                 device = self.get_device_by_mac(mac)
134                 if device is not None:
135                     retval.append(device)
136         return retval
137
138     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
139         if mac in self._keywords_by_mac:
140             name = self._names_by_mac[mac]
141             kws = self._keywords_by_mac[mac]
142             logger.debug(f'Found {name} -> {mac} ({kws})')
143             try:
144                 if 'light' in kws.lower():
145                     if 'tplink' in kws.lower():
146                         logger.debug('    ...a TPLinkLight')
147                         return lights.TPLinkLight(name, mac, kws)
148                     elif 'tuya' in kws.lower():
149                         logger.debug('    ...a TuyaLight')
150                         return lights.TuyaLight(name, mac, kws)
151                     elif 'goog' in kws.lower():
152                         logger.debug('    ...a GoogleLight')
153                         return lights.GoogleLight(name, mac, kws)
154                     else:
155                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
156                 elif 'outlet' in kws.lower():
157                     if 'tplink' in kws.lower():
158                         if 'children' in kws.lower():
159                             logger.debug('    ...a TPLinkOutletWithChildren')
160                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
161                         else:
162                             logger.debug('    ...a TPLinkOutlet')
163                             return outlets.TPLinkOutlet(name, mac, kws)
164                     elif 'goog' in kws.lower():
165                         logger.debug('    ...a GoogleOutlet')
166                         return outlets.GoogleOutlet(name, mac, kws)
167                     else:
168                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
169                 elif 'camera' in kws.lower():
170                     logger.debug('    ...a BaseCamera')
171                     return cameras.BaseCamera(name, mac, kws)
172                 elif 'ccast' in kws.lower():
173                     logger.debug('    ...a Chromecast')
174                     return chromecasts.BaseChromecast(name, mac, kws)
175                 else:
176                     logger.debug('    ...an unknown device (should this be here?)')
177                     return device.Device(name, mac, kws)
178             except Exception as e:
179                 logger.warning(
180                     f'Got exception {e} while trying to communicate with device {name}/{mac}.'
181                 )
182                 return device.Device(name, mac, kws)
183         logger.warning(f'{mac} is not a known smart home device, returning None')
184         return None
185
186     def query(self, query: str) -> List[device.Device]:
187         """Evaluates a lighting query expression formed of keywords to search
188         for, logical operators (and, or, not), and parenthesis.
189         Returns a list of matching lights.
190         """
191         retval = []
192         logger.debug(f'Executing query {query}')
193         results = self._corpus.query(query)
194         if results is not None:
195             for mac in results:
196                 if mac is not None:
197                     device = self.get_device_by_mac(mac)
198                     if device is not None:
199                         retval.append(device)
200         return retval