Make subdirs type clean too.
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6
7 import argparse_utils
8 import config
9 import file_utils
10 import logical_search
11 import smart_home.cameras as cameras
12 import smart_home.chromecasts as chromecasts
13 import smart_home.device as device
14 import smart_home.lights as lights
15 import smart_home.outlets as outlets
16
17 args = config.add_commandline_args(
18     f"Smart Home Registry ({__file__})",
19     "Args related to the smart home configuration registry.",
20 )
21 args.add_argument(
22     '--smart_home_registry_file_location',
23     default='/home/scott/bin/network_mac_addresses.txt',
24     metavar='FILENAME',
25     help='The location of network_mac_addresses.txt',
26     type=argparse_utils.valid_filename,
27 )
28
29
30 logger = logging.getLogger(__file__)
31
32
33 class SmartHomeRegistry(object):
34     def __init__(
35         self,
36         registry_file: Optional[str] = None,
37         filters: List[str] = ['smart'],
38     ) -> None:
39         self._macs_by_name = {}
40         self._keywords_by_name = {}
41         self._keywords_by_mac = {}
42         self._names_by_mac = {}
43         self._corpus = logical_search.Corpus()
44
45         # Read the disk config file...
46         if registry_file is None:
47             registry_file = config.config['smart_home_registry_file_location']
48         assert file_utils.does_file_exist(registry_file)
49         logger.debug(f'Reading {registry_file}')
50         with open(registry_file, "r") as rf:
51             contents = rf.readlines()
52
53         # Parse the contents...
54         for line in contents:
55             line = line.rstrip("\n")
56             line = re.sub(r"#.*$", r"", line)
57             line = line.strip()
58             if line == "":
59                 continue
60             logger.debug(f'SH-CONFIG> {line}')
61             try:
62                 (mac, name, keywords) = line.split(",")
63             except ValueError:
64                 msg = f'SH-CONFIG> "{line}" is malformed?!  Skipping it.'
65                 logger.warning(msg)
66                 continue
67             mac = mac.strip()
68             name = name.strip()
69             keywords = keywords.strip()
70
71             skip = False
72             if filters is not None:
73                 for f in filters:
74                     if f not in keywords:
75                         logger.debug(f'Skipping this entry b/c of filter {f}')
76                         skip = True
77                         break
78             if not skip:
79                 self._macs_by_name[name] = mac
80                 self._keywords_by_name[name] = keywords
81                 self._keywords_by_mac[mac] = keywords
82                 self._names_by_mac[mac] = name
83                 self.index_device(name, keywords, mac)
84
85     def index_device(self, name: str, keywords: str, mac: str) -> None:
86         properties = [("name", name)]
87         tags = set()
88         for kw in keywords.split():
89             if ":" in kw:
90                 key, value = kw.split(":")
91                 properties.append((key, value))
92             else:
93                 tags.add(kw)
94         device = logical_search.Document(
95             docid=mac,
96             tags=tags,
97             properties=properties,
98             reference=None,
99         )
100         logger.debug(f'Indexing document {device}')
101         self._corpus.add_doc(device)
102
103     def __repr__(self) -> str:
104         s = "Known devices:\n"
105         for name, keywords in self._keywords_by_name.items():
106             mac = self._macs_by_name[name]
107             s += f"  {name} ({mac}) => {keywords}\n"
108         return s
109
110     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
111         return self._keywords_by_name.get(name, None)
112
113     def get_macs_by_name(self, name: str) -> Set[str]:
114         retval = set()
115         for (mac, lname) in self._names_by_mac.items():
116             if name in lname:
117                 retval.add(mac)
118         return retval
119
120     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
121         retval = set()
122         for (mac, keywords) in self._keywords_by_mac.items():
123             if keyword in keywords:
124                 retval.add(mac)
125         return retval
126
127     def get_device_by_name(self, name: str) -> Optional[device.Device]:
128         if name in self._macs_by_name:
129             return self.get_device_by_mac(self._macs_by_name[name])
130         return None
131
132     def get_all_devices(self) -> List[device.Device]:
133         retval = []
134         for (mac, kws) in self._keywords_by_mac.items():
135             if mac is not None:
136                 device = self.get_device_by_mac(mac)
137                 if device is not None:
138                     retval.append(device)
139         return retval
140
141     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
142         if mac in self._keywords_by_mac:
143             name = self._names_by_mac[mac]
144             kws = self._keywords_by_mac[mac]
145             logger.debug(f'Found {name} -> {mac} ({kws})')
146             try:
147                 if 'light' in kws.lower():
148                     if 'tplink' in kws.lower():
149                         logger.debug('    ...a TPLinkLight')
150                         return lights.TPLinkLight(name, mac, kws)
151                     elif 'tuya' in kws.lower():
152                         logger.debug('    ...a TuyaLight')
153                         return lights.TuyaLight(name, mac, kws)
154                     elif 'goog' in kws.lower():
155                         logger.debug('    ...a GoogleLight')
156                         return lights.GoogleLight(name, mac, kws)
157                     else:
158                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
159                 elif 'outlet' in kws.lower():
160                     if 'tplink' in kws.lower():
161                         if 'children' in kws.lower():
162                             logger.debug('    ...a TPLinkOutletWithChildren')
163                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
164                         else:
165                             logger.debug('    ...a TPLinkOutlet')
166                             return outlets.TPLinkOutlet(name, mac, kws)
167                     elif 'meross' in kws.lower():
168                         logger.debug('    ...a MerossOutlet')
169                         return outlets.MerossOutlet(name, mac, kws)
170                     elif 'goog' in kws.lower():
171                         logger.debug('    ...a GoogleOutlet')
172                         return outlets.GoogleOutlet(name, mac, kws)
173                     else:
174                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
175                 elif 'camera' in kws.lower():
176                     logger.debug('    ...a BaseCamera')
177                     return cameras.BaseCamera(name, mac, kws)
178                 elif 'ccast' in kws.lower():
179                     logger.debug('    ...a Chromecast')
180                     return chromecasts.BaseChromecast(name, mac, kws)
181                 else:
182                     logger.debug('    ...an unknown device (should this be here?)')
183                     return device.Device(name, mac, kws)
184             except Exception as e:
185                 logger.exception(e)
186                 logger.debug(
187                     f'Device {name} at {mac} with {kws} confused me, returning a generic Device'
188                 )
189                 return device.Device(name, mac, kws)
190         msg = f'{mac} is not a known smart home device, returning None'
191         logger.warning(msg)
192         return None
193
194     def query(self, query: str) -> List[device.Device]:
195         """Evaluates a lighting query expression formed of keywords to search
196         for, logical operators (and, or, not), and parenthesis.
197         Returns a list of matching lights.
198         """
199         retval = []
200         logger.debug(f'Executing query {query}')
201         results = self._corpus.query(query)
202         if results is not None:
203             for mac in results:
204                 if mac is not None:
205                     device = self.get_device_by_mac(mac)
206                     if device is not None:
207                         retval.append(device)
208         return retval