Add cameras, fix bugs
[python_utils.git] / smart_home / config.py
1 #!/usr/bin/env python3
2
3 import logging
4 import re
5 from typing import List, Optional, Set
6
7 import argparse_utils
8 import config
9 import file_utils
10 import logical_search
11 import smart_home.device as device
12 import smart_home.cameras as cameras
13 import smart_home.lights as lights
14 import smart_home.outlets as outlets
15
16 parser = config.add_commandline_args(
17     f"Smart Home Config ({__file__})",
18     "Args related to the smart home config."
19 )
20 parser.add_argument(
21     '--smart_home_config_file_location',
22     default='/home/scott/bin/network_mac_addresses.txt',
23     metavar='FILENAME',
24     help='The location of network_mac_addresses.txt',
25     type=argparse_utils.valid_filename,
26 )
27
28
29 logger = logging.getLogger(__file__)
30
31
32 class SmartHomeConfig(object):
33     def __init__(
34             self,
35             config_file: Optional[str] = None,
36             filters: List[str] = ['smart'],
37     ) -> None:
38         self._macs_by_name = {}
39         self._keywords_by_name = {}
40         self._keywords_by_mac = {}
41         self._names_by_mac = {}
42         self._corpus = logical_search.Corpus()
43
44         # Read the disk config file...
45         if config_file is None:
46             config_file = config.config[
47                 'smart_home_config_file_location'
48             ]
49         assert file_utils.does_file_exist(config_file)
50         logger.debug(f'Reading {config_file}')
51         with open(config_file, "r") as f:
52             contents = f.readlines()
53
54         # Parse the contents...
55         for line in contents:
56             line = line.rstrip("\n")
57             line = re.sub(r"#.*$", r"", line)
58             line = line.strip()
59             if line == "":
60                 continue
61             logger.debug(f'> {line}')
62             (mac, name, keywords) = line.split(",")
63             mac = mac.strip()
64             name = name.strip()
65             keywords = keywords.strip()
66
67             skip = False
68             if filters is not None:
69                 for f in filters:
70                     if f not in keywords:
71                         logger.debug(f'Skipping this entry b/c of filter {f}')
72                         skip = True
73                         break
74             if not skip:
75                 self._macs_by_name[name] = mac
76                 self._keywords_by_name[name] = keywords
77                 self._keywords_by_mac[mac] = keywords
78                 self._names_by_mac[mac] = name
79                 self.index_device(name, keywords, mac)
80
81     def index_device(self, name: str, keywords: str, mac: str) -> None:
82         properties = [("name", name)]
83         tags = set()
84         for kw in keywords.split():
85             if ":" in kw:
86                 key, value = kw.split(":")
87                 properties.append((key, value))
88             else:
89                 tags.add(kw)
90         device = logical_search.Document(
91             docid=mac,
92             tags=tags,
93             properties=properties,
94             reference=None,
95         )
96         logger.debug(f'Indexing document {device}')
97         self._corpus.add_doc(device)
98
99     def __repr__(self) -> str:
100         s = "Known devices:\n"
101         for name, keywords in self._keywords_by_name.items():
102             mac = self._macs_by_name[name]
103             s += f"  {name} ({mac}) => {keywords}\n"
104         return s
105
106     def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
107         return self._keywords_by_name.get(name, None)
108
109     def get_macs_by_name(self, name: str) -> Set[str]:
110         retval = set()
111         for (mac, lname) in self._names_by_mac.items():
112             if name in lname:
113                 retval.add(mac)
114         return retval
115
116     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
117         retval = set()
118         for (mac, keywords) in self._keywords_by_mac.items():
119             if keyword in keywords:
120                 retval.add(mac)
121         return retval
122
123     def get_device_by_name(self, name: str) -> Optional[device.Device]:
124         if name in self._macs_by_name:
125             return self.get_device_by_mac(self._macs_by_name[name])
126         return None
127
128     def get_all_devices(self) -> List[device.Device]:
129         retval = []
130         for (mac, kws) in self._keywords_by_mac.items():
131             if mac is not None:
132                 device = self.get_device_by_mac(mac)
133                 if device is not None:
134                     retval.append(device)
135         return retval
136
137     def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
138         if mac in self._keywords_by_mac:
139             name = self._names_by_mac[mac]
140             kws = self._keywords_by_mac[mac]
141             logger.debug(f'Found {name} -> {mac} ({kws})')
142             if 'light' in kws.lower():
143                 if 'tplink' in kws.lower():
144                     logger.debug('    ...a TPLinkLight')
145                     return lights.TPLinkLight(name, mac, kws)
146                 elif 'tuya' in kws.lower():
147                     logger.debug('    ...a TuyaLight')
148                     return lights.TuyaLight(name, mac, kws)
149                 elif 'goog' in kws.lower():
150                     logger.debug('    ...a GoogleLight')
151                     return lights.GoogleLight(name, mac, kws)
152                 else:
153                     raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
154             elif 'outlet' in kws.lower():
155                 if 'tplink' in kws.lower():
156                     if 'children' in kws.lower():
157                         logger.debug('    ...a TPLinkOutletWithChildren')
158                         return outlets.TPLinkOutletWithChildren(name, mac, kws)
159                     else:
160                         logger.debug('    ...a TPLinkOutlet')
161                         return outlets.TPLinkOutlet(name, mac, kws)
162                 elif 'goog' in kws.lower():
163                     logger.debug('    ...a GoogleOutlet')
164                     return outlets.GoogleOutlet(name, mac, kws)
165                 else:
166                     raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
167             elif 'camera' in kws.lower():
168                 logger.debug('    ...a BaseCamera')
169                 return cameras.BaseCamera(name, mac, kws)
170             else:
171                 logger.debug('    ...an unknown device (should this be here?)')
172                 return device.Device(name, mac, kws)
173         logger.warning(f'{mac} is not known, returning None')
174         return None
175
176     def query(self, query: str) -> List[device.Device]:
177         """Evaluates a lighting query expression formed of keywords to search
178         for, logical operators (and, or, not), and parenthesis.
179         Returns a list of matching lights.
180         """
181         retval = []
182         logger.debug(f'Executing query {query}')
183         results = self._corpus.query(query)
184         if results is not None:
185             for mac in results:
186                 if mac is not None:
187                     device = self.get_device_by_mac(mac)
188                     if device is not None:
189                         retval.append(device)
190         return retval