A bunch of changes...
[python_utils.git] / smart_home / registry.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6
7 import argparse_utils
8 import config
9 import file_utils
10 import logical_search
11 import smart_home.device as device
12 import smart_home.cameras as cameras
13 import smart_home.chromecasts as chromecasts
14 import smart_home.lights as lights
15 import smart_home.outlets as outlets
16
17 args = config.add_commandline_args(
18     f"Smart Home Registry ({__file__})",
19     "Args related to the smart home configuration registry."
20 )
21 args.add_argument(
22     '--smart_home_registry_file_location',
23     default='/home/scott/bin/network_mac_addresses.txt',
24     metavar='FILENAME',
25     help='The location of network_mac_addresses.txt',
26     type=argparse_utils.valid_filename,
27 )
28
29
30 logger = logging.getLogger(__file__)
31
32
33 class SmartHomeRegistry(object):
34     def __init__(
35             self,
36             registry_file: Optional[str] = None,
37             filters: List[str] = ['smart'],
38     ) -> None:
39         self._macs_by_name = {}
40         self._keywords_by_name = {}
41         self._keywords_by_mac = {}
42         self._names_by_mac = {}
43         self._corpus = logical_search.Corpus()
44
45         # Read the disk config file...
46         if registry_file is None:
47             registry_file = config.config[
48                 'smart_home_registry_file_location'
49             ]
50         assert file_utils.does_file_exist(registry_file)
51         logger.debug(f'Reading {registry_file}')
52         with open(registry_file, "r") as f:
53             contents = f.readlines()
54
55         # Parse the contents...
56         for line in contents:
57             line = line.rstrip("\n")
58             line = re.sub(r"#.*$", r"", line)
59             line = line.strip()
60             if line == "":
61                 continue
62             logger.debug(f'SH-CONFIG> {line}')
63             try:
64                 (mac, name, keywords) = line.split(",")
65             except ValueError:
66                 logger.warning(f'SH-CONFIG> {line} is malformed?!')
67                 continue
68             mac = mac.strip()
69             name = name.strip()
70             keywords = keywords.strip()
71
72             skip = False
73             if filters is not None:
74                 for f in filters:
75                     if f not in keywords:
76                         logger.debug(f'Skipping this entry b/c of filter {f}')
77                         skip = True
78                         break
79             if not skip:
80                 self._macs_by_name[name] = mac
81                 self._keywords_by_name[name] = keywords
82                 self._keywords_by_mac[mac] = keywords
83                 self._names_by_mac[mac] = name
84                 self.index_device(name, keywords, mac)
85
86     def index_device(self, name: str, keywords: str, mac: str) -> None:
87         properties = [("name", name)]
88         tags = set()
89         for kw in keywords.split():
90             if ":" in kw:
91                 key, value = kw.split(":")
92                 properties.append((key, value))
93             else:
94                 tags.add(kw)
95         device = logical_search.Document(
96             docid=mac,
97             tags=tags,
98             properties=properties,
99             reference=None,
100         )
101         logger.debug(f'Indexing document {device}')
102         self._corpus.add_doc(device)
103
104     def __repr__(self) -> str:
105         s = "Known devices:\n"
106         for name, keywords in self._keywords_by_name.items():
107             mac = self._macs_by_name[name]
108             s += f"  {name} ({mac}) => {keywords}\n"
109         return s
110
111     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
112         return self._keywords_by_name.get(name, None)
113
114     def get_macs_by_name(self, name: str) -> Set[str]:
115         retval = set()
116         for (mac, lname) in self._names_by_mac.items():
117             if name in lname:
118                 retval.add(mac)
119         return retval
120
121     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
122         retval = set()
123         for (mac, keywords) in self._keywords_by_mac.items():
124             if keyword in keywords:
125                 retval.add(mac)
126         return retval
127
128     def get_device_by_name(self, name: str) -> Optional[device.Device]:
129         if name in self._macs_by_name:
130             return self.get_device_by_mac(self._macs_by_name[name])
131         return None
132
133     def get_all_devices(self) -> List[device.Device]:
134         retval = []
135         for (mac, kws) in self._keywords_by_mac.items():
136             if mac is not None:
137                 device = self.get_device_by_mac(mac)
138                 if device is not None:
139                     retval.append(device)
140         return retval
141
142     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
143         if mac in self._keywords_by_mac:
144             name = self._names_by_mac[mac]
145             kws = self._keywords_by_mac[mac]
146             logger.debug(f'Found {name} -> {mac} ({kws})')
147             try:
148                 if 'light' in kws.lower():
149                     if 'tplink' in kws.lower():
150                         logger.debug('    ...a TPLinkLight')
151                         return lights.TPLinkLight(name, mac, kws)
152                     elif 'tuya' in kws.lower():
153                         logger.debug('    ...a TuyaLight')
154                         return lights.TuyaLight(name, mac, kws)
155                     elif 'goog' in kws.lower():
156                         logger.debug('    ...a GoogleLight')
157                         return lights.GoogleLight(name, mac, kws)
158                     else:
159                         raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
160                 elif 'outlet' in kws.lower():
161                     if 'tplink' in kws.lower():
162                         if 'children' in kws.lower():
163                             logger.debug('    ...a TPLinkOutletWithChildren')
164                             return outlets.TPLinkOutletWithChildren(name, mac, kws)
165                         else:
166                             logger.debug('    ...a TPLinkOutlet')
167                             return outlets.TPLinkOutlet(name, mac, kws)
168                     elif 'meross' in kws.lower():
169                         logger.debug('    ...a MerossOutlet')
170                         return outlets.MerossOutlet(name, mac, kws)
171                     elif 'goog' in kws.lower():
172                         logger.debug('    ...a GoogleOutlet')
173                         return outlets.GoogleOutlet(name, mac, kws)
174                     else:
175                         raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
176                 elif 'camera' in kws.lower():
177                     logger.debug('    ...a BaseCamera')
178                     return cameras.BaseCamera(name, mac, kws)
179                 elif 'ccast' in kws.lower():
180                     logger.debug('    ...a Chromecast')
181                     return chromecasts.BaseChromecast(name, mac, kws)
182                 else:
183                     logger.debug('    ...an unknown device (should this be here?)')
184                     return device.Device(name, mac, kws)
185             except Exception as e:
186                 logger.warning(
187                     f'Got exception {e} while trying to communicate with device {name}/{mac}.'
188                 )
189                 return device.Device(name, mac, kws)
190         logger.warning(f'{mac} is not a known smart home device, returning None')
191         return None
192
193     def query(self, query: str) -> List[device.Device]:
194         """Evaluates a lighting query expression formed of keywords to search
195         for, logical operators (and, or, not), and parenthesis.
196         Returns a list of matching lights.
197         """
198         retval = []
199         logger.debug(f'Executing query {query}')
200         results = self._corpus.query(query)
201         if results is not None:
202             for mac in results:
203                 if mac is not None:
204                     device = self.get_device_by_mac(mac)
205                     if device is not None:
206                         retval.append(device)
207         return retval