#!/usr/bin/env python3
+"""Most basic definition of a smart device: it must have a name and a
+MAC address and may have some optional keywords. All devices have
+these whether they are lights, outlets, thermostats, etc..."""
+
import re
from typing import List, Optional
class Device(object):
+ """Most basic definition of a smart device: it must have a name and a
+ MAC address and may have some optional keywords. All devices have
+ these whether they are lights, outlets, thermostats, etc..."""
+
def __init__(
self,
name: str,
"""Utilities for dealing with the smart lights."""
import datetime
-import json
import logging
-import os
import re
-import subprocess
-import sys
from abc import abstractmethod
from typing import Any, Dict, List, Optional, Tuple
import argparse_utils
import arper
import config
-import logging_utils
import smart_home.device as dev
-from decorator_utils import memoized, timeout
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_light_command(command: str) -> bool:
- result = os.system(command)
- signal = result & 0xFF
- if signal != 0:
- msg = f'{command} died with signal {signal}'
- logger.warning(msg)
- logging_utils.hlog(msg)
- return False
- else:
- exit_value = result >> 8
- if exit_value != 0:
- msg = f'{command} failed, exited {exit_value}'
- logger.warning(msg)
- logging_utils.hlog(msg)
- return False
- logger.debug('%s succeeded.', command)
- return True
-
-
class BaseLight(dev.Device):
"""A base class representing a smart light."""
if extra_args is not None:
cmd += f" {extra_args}"
logger.debug('About to execute: %s', cmd)
- return tplink_light_command(cmd)
+ return tplink.tplink_command(cmd)
@overrides
- def turn_on(self, child: str = None) -> bool:
- return self.command("on", child)
+ def turn_on(self) -> bool:
+ return self.command("on", None)
@overrides
- def turn_off(self, child: str = None) -> bool:
+ def turn_off(self) -> bool:
+ return self.command("off", None)
+
+ def turn_on_child(self, child: str = None) -> bool:
+ return self.command("on", child)
+
+ def turn_off_child(self, child: str = None) -> bool:
return self.command("off", child)
@overrides
def make_color(self, color: str) -> bool:
raise NotImplementedError
- @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
- logger.debug('Getting status of %s via "%s"...', self.mac, cmd)
- out = subprocess.getoutput(cmd)
- logger.debug('RAW OUT> %s', out)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- try:
- self.info = json.loads(out)["system"]["get_sysinfo"]
- logger.debug("%s", json.dumps(self.info, indent=4, sort_keys=True))
+ self.info = tplink.tplink_get_info(cmd)
+ if self.info is not None:
self.info_ts = datetime.datetime.now()
- return self.info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- self.info = None
+ else:
self.info_ts = None
- return None
+ return self.info
@overrides
def status(self) -> str:
ret = ''
- for k, v in self.get_info().items():
- ret += f'{k} = {v}\n'
+ info = self.get_info()
+ if info is not None:
+ for k, v in info:
+ ret += f'{k} = {v}\n'
return ret
def get_on_duration_seconds(self, child: str = None) -> int:
return False
cmd = (
self.get_cmdline()
- + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
+ + '-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{%d} }} }} }}\''
+ % level
)
- return tplink_light_command(cmd)
+ return tplink.tplink_command(cmd)
# class GoogleLightGroup(GoogleLight):
import asyncio
import atexit
import datetime
-import json
import logging
import os
-import re
-import subprocess
-import sys
from abc import abstractmethod
from typing import Any, Dict, List, Optional
import argparse_utils
import config
import decorator_utils
-import logging_utils
import scott_secrets
import smart_home.device as dev
-from decorator_utils import memoized, timeout
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_outlet_command(command: str) -> bool:
- result = os.system(command)
- signal = result & 0xFF
- if signal != 0:
- msg = f'{command} died with signal {signal}'
- logger.warning(msg)
- logging_utils.hlog(msg)
- return False
- else:
- exit_value = result >> 8
- if exit_value != 0:
- msg = f'{command} failed, exited {exit_value}'
- logger.warning(msg)
- logging_utils.hlog(msg)
- return False
- logger.debug('%s succeeded.', command)
- return True
-
-
class BaseOutlet(dev.Device):
"""An abstract base class for smart outlets."""
cmd = self.get_cmdline() + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
- return tplink_outlet_command(cmd)
+ return tplink.tplink_command(cmd)
@overrides
def turn_on(self) -> bool:
def is_off(self) -> bool:
return not self.is_on()
- @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
- out = subprocess.getoutput(cmd)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- try:
- self.info = json.loads(out)["system"]["get_sysinfo"]
+ self.info = tplink.tplink_get_info(cmd)
+ if self.info is not None:
self.info_ts = datetime.datetime.now()
- return self.info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- self.info = None
+ else:
self.info_ts = None
- return None
+ return self.info
def get_on_duration_seconds(self) -> int:
self.info = self.get_info()
for child in self.info["children"]:
self.children.append(child["id"])
- @overrides
- def get_cmdline(self, child: Optional[str] = None) -> str:
- cmd = (
- f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
- f"--no_logging_console "
- )
+ def get_cmdline_with_child(self, child: Optional[str] = None) -> str:
+ cmd = super().get_cmdline()
if child is not None:
cmd += f"-x {child} "
return cmd
@overrides
def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
child: Optional[str] = kwargs.get('child', None)
- cmd = self.get_cmdline(child) + f"-c {cmd}"
+ cmd = self.get_cmdline_with_child(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
logger.debug('About to execute: %s', cmd)
- return tplink_outlet_command(cmd)
+ return tplink.tplink_command(cmd)
def get_children(self) -> List[str]:
return self.children
@overrides
- def turn_on(self, child: str = None) -> bool:
- return self.command("on", child)
+ def turn_on(self) -> bool:
+ return self.command("on", None)
@overrides
- def turn_off(self, child: str = None) -> bool:
+ def turn_off(self) -> bool:
+ return self.command("off", None)
+
+ def turn_on_child(self, child: str = None) -> bool:
+ return self.command("on", child)
+
+ def turn_off_child(self, child: str = None) -> bool:
return self.command("off", child)
- def get_on_duration_seconds(self, child: str = None) -> int:
- self.info = self.get_info()
+ def get_child_on_duration_seconds(self, child: str = None) -> int:
if child is None:
- if self.info is None:
- return 0
- return int(self.info.get("on_time", "0"))
+ return super().get_on_duration_seconds()
else:
+ self.info = self.get_info()
if self.info is None:
return 0
for chi in self.info.get("children", {}):
--- /dev/null
+#!/usr/bin/env python3
+
+"""Wrapper functions for calling tplink.py"""
+
+import json
+import logging
+import os
+import re
+import subprocess
+import sys
+from typing import Dict, Optional
+
+import logging_utils
+from decorator_utils import timeout
+
+logger = logging.getLogger(__name__)
+
+
+@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
+def tplink_command(command: str) -> bool:
+ result = os.system(command)
+ signal = result & 0xFF
+ if signal != 0:
+ msg = f'{command} died with signal {signal}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
+ return False
+ else:
+ exit_value = result >> 8
+ if exit_value != 0:
+ msg = f'{command} failed, exited {exit_value}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
+ return False
+ logger.debug('%s succeeded.', command)
+ return True
+
+
+@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
+def tplink_get_info(cmd: str) -> Optional[Dict]:
+ logger.debug('Getting tplink device status via "%s"', cmd)
+ try:
+ out = subprocess.getoutput(cmd)
+ logger.debug('RAW OUT> %s', out)
+ out = re.sub("Sent:.*\n", "", out)
+ out = re.sub("Received: *", "", out)
+ info = json.loads(out)["system"]["get_sysinfo"]
+ logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
+ return info
+ except Exception as e:
+ logger.exception(e)
+ print(out, file=sys.stderr)
+ return None