Typo.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logging_utils
18 import smart_home.device as dev
19 from google_assistant import ask_google, GoogleResponse
20 from decorator_utils import timeout, memoized
21
22 logger = logging.getLogger(__name__)
23
24 parser = config.add_commandline_args(
25     f"Outlet Utils ({__file__})",
26     "Args related to smart outlets.",
27 )
28 parser.add_argument(
29     '--smart_outlets_tplink_location',
30     default='/home/scott/bin/tplink.py',
31     metavar='FILENAME',
32     help='The location of the tplink.py helper',
33     type=argparse_utils.valid_filename,
34 )
35
36
37 @timeout(
38     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
39 )
40 def tplink_outlet_command(command: str) -> bool:
41     result = os.system(command)
42     signal = result & 0xFF
43     if signal != 0:
44         logger.warning(f'{command} died with signal {signal}')
45         logging_utils.hlog("%s died with signal %d" % (command, signal))
46         return False
47     else:
48         exit_value = result >> 8
49         if exit_value != 0:
50             logger.warning(f'{command} failed, exited {exit_value}')
51             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
52             return False
53     logger.debug(f'{command} succeeded.')
54     return True
55
56
57 class BaseOutlet(dev.Device):
58     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
59         super().__init__(name.strip(), mac.strip(), keywords)
60         self.info = None
61
62     @abstractmethod
63     def turn_on(self) -> bool:
64         pass
65
66     @abstractmethod
67     def turn_off(self) -> bool:
68         pass
69
70     @abstractmethod
71     def is_on(self) -> bool:
72         pass
73
74     @abstractmethod
75     def is_off(self) -> bool:
76         pass
77
78
79 class TPLinkOutlet(BaseOutlet):
80     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
81         super().__init__(name, mac, keywords)
82         self.info: Optional[Dict] = None
83         self.info_ts: Optional[datetime.datetime] = None
84
85     @memoized
86     def get_tplink_name(self) -> Optional[str]:
87         self.info = self.get_info()
88         if self.info is not None:
89             return self.info["alias"]
90         return None
91
92     def get_cmdline(self) -> str:
93         cmd = (
94             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
95             f"--no_logging_console "
96         )
97         return cmd
98
99     def command(self, cmd: str, extra_args: str = None) -> bool:
100         cmd = self.get_cmdline() + f"-c {cmd}"
101         if extra_args is not None:
102             cmd += f" {extra_args}"
103         return tplink_outlet_command(cmd)
104
105     def turn_on(self) -> bool:
106         return self.command('on')
107
108     def turn_off(self) -> bool:
109         return self.command('off')
110
111     def is_on(self) -> bool:
112         return self.get_on_duration_seconds() > 0
113
114     def is_off(self) -> bool:
115         return not self.is_on()
116
117     @timeout(
118         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
119     )
120     def get_info(self) -> Optional[Dict]:
121         cmd = self.get_cmdline() + "-c info"
122         out = subprocess.getoutput(cmd)
123         out = re.sub("Sent:.*\n", "", out)
124         out = re.sub("Received: *", "", out)
125         try:
126             self.info = json.loads(out)["system"]["get_sysinfo"]
127             self.info_ts = datetime.datetime.now()
128             return self.info
129         except Exception as e:
130             logger.exception(e)
131             print(out, file=sys.stderr)
132             self.info = None
133             self.info_ts = None
134             return None
135
136     def get_on_duration_seconds(self) -> int:
137         self.info = self.get_info()
138         if self.info is None:
139             return 0
140         return int(self.info.get("on_time", "0"))
141
142
143 class TPLinkOutletWithChildren(TPLinkOutlet):
144     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
145         super().__init__(name, mac, keywords)
146         self.children: List[str] = []
147         self.info: Optional[Dict] = None
148         self.info_ts: Optional[datetime.datetime] = None
149         assert "children" in self.keywords
150         self.info = self.get_info()
151         if self.info is not None:
152             for child in self.info["children"]:
153                 self.children.append(child["id"])
154
155     # override
156     def get_cmdline(self, child: Optional[str] = None) -> str:
157         cmd = (
158             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
159             f"--no_logging_console "
160         )
161         if child is not None:
162             cmd += f"-x {child} "
163         return cmd
164
165     # override
166     def command(
167         self, cmd: str, child: str = None, extra_args: str = None
168     ) -> bool:
169         cmd = self.get_cmdline(child) + f"-c {cmd}"
170         if extra_args is not None:
171             cmd += f" {extra_args}"
172         logger.debug(f'About to execute {cmd}')
173         return tplink_outlet_command(cmd)
174
175     def get_children(self) -> List[str]:
176         return self.children
177
178     def turn_on(self, child: str = None) -> bool:
179         return self.command("on", child)
180
181     def turn_off(self, child: str = None) -> bool:
182         return self.command("off", child)
183
184     def get_on_duration_seconds(self, child: str = None) -> int:
185         self.info = self.get_info()
186         if child is None:
187             if self.info is None:
188                 return 0
189             return int(self.info.get("on_time", "0"))
190         else:
191             if self.info is None:
192                 return 0
193             for chi in self.info.get("children", {}):
194                 if chi["id"] == child:
195                     return int(chi.get("on_time", "0"))
196         return 0
197
198
199 class GoogleOutlet(BaseOutlet):
200     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
201         super().__init__(name.strip(), mac.strip(), keywords)
202         self.info = None
203
204     def goog_name(self) -> str:
205         name = self.get_name()
206         return name.replace('_', ' ')
207
208     @staticmethod
209     def parse_google_response(response: GoogleResponse) -> bool:
210         return response.success
211
212     def turn_on(self) -> bool:
213         return GoogleOutlet.parse_google_response(
214             ask_google('turn {self.goog_name()} on')
215         )
216
217     def turn_off(self) -> bool:
218         return GoogleOutlet.parse_google_response(
219             ask_google('turn {self.goog_name()} off')
220         )
221
222     def is_on(self) -> bool:
223         r = ask_google(f'is {self.goog_name()} on?')
224         if not r.success:
225             return False
226         return 'is on' in r.audio_transcription
227
228     def is_off(self) -> bool:
229         return not self.is_on()